Local Mode (luigi.Task)
class DataDump(luigi.ExternalTask):
date = luigi.DateParameter()
def output(self):
return luigi.contrib.hdfs.HdfsTarget(self.date.strftime('/var/log/dump/%Y-%m-%d.txt'))
class AggregationTask(luigi.Task):
date = luigi.DateParameter()
window = luigi.IntParameter()
def requires(self):
return [DataDump(self.date - datetime.timedelta(i)) for i in xrange(self.window)]
def run(self):
run_some_cool_stuff(self.input())
def output(self):
return luigi.contrib.hdfs.HdfsTarget('/aggregated-%s-%d' % (self.date, self.window))
class RunLocal(luigi.Task):
def requires(self):
for window in [3, 7, 14]:
for d in xrange(10): # guarantee that aggregations were run for the past 10 days
yield AggregationTask(datetime.date.today() - datetime.timedelta(d), window)
Multiple Local Task Examples
class AllReports(luigi.WrapperTask):
date = luigi.DateParameter(default=datetime.date.today())
def requires(self):
yield SomeReport(self.date)
yield SomeOtherReport(self.date)
yield CropReport(self.date)
yield TPSReport(self.date)
yield FooBarBazReport(self.date)
Hadoop Mode (luigi.contrib.hadoop.JobTask)
class AggregateArtistsHadoop(luigi.contrib.hadoop.JobTask):
date_interval = luigi.DateIntervalParameter()
def output(self):
return luigi.contrib.hdfs.HdfsTarget("data/artist_streams_%s.tsv" % self.date_interval, format=luigi.contrib.hdfs.PlainDir)
def requires(self):
return [StreamsHdfs(date) for date in self.date_interval]
def mapper(self, line):
_, artist, _ = line.strip().split()
yield artist, 1
def reducer(self, key, values):
yield key, sum(values)
Spark Mode (luigi.contrib.spark.SparkSubmitTask)
https:\/\/github.com\/spotify\/luigi\/blob\/master\/examples\/spark_als.py
PySpark Mode (luigi.contrib.spark.SparkSubmitTask, PySparkTask)
https:\/\/github.com\/spotify\/luigi\/blob\/master\/examples\/pyspark_wc.py