Local Mode (luigi.Task)

class DataDump(luigi.ExternalTask): 
    date = luigi.DateParameter() 

    def output(self):
        return luigi.contrib.hdfs.HdfsTarget(self.date.strftime('/var/log/dump/%Y-%m-%d.txt'))

class AggregationTask(luigi.Task): 
    date = luigi.DateParameter() 
    window = luigi.IntParameter() 

    def requires(self):
        return [DataDump(self.date - datetime.timedelta(i)) for i in xrange(self.window)] 

    def run(self):
        run_some_cool_stuff(self.input()) 

    def output(self):
        return luigi.contrib.hdfs.HdfsTarget('/aggregated-%s-%d' % (self.date, self.window)) 

class RunLocal(luigi.Task): 
    def requires(self):
        for window in [3, 7, 14]:
            for d in xrange(10): # guarantee that aggregations were run for the past 10 days 
                yield AggregationTask(datetime.date.today() - datetime.timedelta(d), window)

Multiple Local Task Examples

class AllReports(luigi.WrapperTask):
    date = luigi.DateParameter(default=datetime.date.today())

    def requires(self):
        yield SomeReport(self.date)
        yield SomeOtherReport(self.date)
        yield CropReport(self.date)
        yield TPSReport(self.date)
        yield FooBarBazReport(self.date)

Hadoop Mode (luigi.contrib.hadoop.JobTask)

class AggregateArtistsHadoop(luigi.contrib.hadoop.JobTask):
  date_interval = luigi.DateIntervalParameter()

    def output(self):
        return luigi.contrib.hdfs.HdfsTarget("data/artist_streams_%s.tsv" % self.date_interval, format=luigi.contrib.hdfs.PlainDir)

    def requires(self):
        return [StreamsHdfs(date) for date in self.date_interval]

    def mapper(self, line):
        _, artist, _ = line.strip().split()

        yield artist, 1

    def reducer(self, key, values):
        yield key, sum(values)

Spark Mode (luigi.contrib.spark.SparkSubmitTask)

https:\/\/github.com\/spotify\/luigi\/blob\/master\/examples\/spark_als.py

PySpark Mode (luigi.contrib.spark.SparkSubmitTask, PySparkTask)

https:\/\/github.com\/spotify\/luigi\/blob\/master\/examples\/pyspark_wc.py

下一步,從查詢資料庫,並儲存於本地端

results matching ""

    No results matching ""