It sounds like you want something like this? root_operator = DummyOperator()
def offset_operator(i): my_sql_query = "SELECT * FROM {{{{ds_add(execution_date, {offset}) }}}};".format(offset=i) sql_operator = SQLOperator(task_id="offset_by_{}".format(i)", query=my_sql_query) return sql_operator offset_operators = list(offset_operator(i) for i in range(7)) root_operator >> offset_operators # Daily just waits on today, no offset do_daily_work = DummyOperator() offset_operators[0] >> do_daily_work # Weekly waits on today AND the six prior offsets do_weekly_work = DummyOperator() offset_operators >> do_weekly_work IOW, every day you wait for that day's data to be available, and then run the daily job; you also wait for the previous six days data to be available, and when it is, run the weekly job. n.b. - if you do it this way you will have up to 7 tasks polling the "same" data point, which is slightly wasteful. But it's also not much code or mental effort to write it this way. On Wed, Aug 8, 2018 at 2:44 PM Gabriel Silk <gs...@dropbox.com.invalid> wrote: > My main concern is how to express the fact that the weekly rollup depends > on the previous 7 days worth of data, and ensure that it does not run until > the tasks that generate those 7 days of data have run, assuming that tasks > can run non-sequentially. > > It's easy enough when you have the following situation: > > (daily log ingestion) <-- (daily rollup) > > In any given DAG run, you are guaranteed to have the data needed for (daily > rollup), because the dependency that generated its data just ran. > > But I'm not sure how best to model it when you have all of the following: > > (daily log ingestion) <-- (daily rollup) > (daily log ingestion) <-- (weekly rollup) > (daily log ingestion) <-- (monthly rollup) > > > > On Wed, Aug 8, 2018 at 11:29 AM, Taylor Edmiston <tedmis...@gmail.com> > wrote: > > > Gabriel - > > > > Ah, I missed your earlier comment about weekly/monthly rollups also being > > on a daily cadence. So is your concern e.g., more about reducing the > > redundant process of the weekly rollup tasks for the days of that range > > that already processed in the previous DAG run(s)? Or mainly about the > > dependency of not executing the first weekly at all until the first 7 > daily > > rollups worth of data have built up? > > > > *Taylor Edmiston* > > Blog <https://blog.tedmiston.com/> | CV > > <https://stackoverflow.com/cv/taylor> | LinkedIn > > <https://www.linkedin.com/in/tedmiston/> | AngelList > > <https://angel.co/taylor> | Stack Overflow > > <https://stackoverflow.com/users/149428/taylor-edmiston> > > > > > > On Wed, Aug 8, 2018 at 2:14 PM, James Meickle <jmeic...@quantopian.com. > > invalid> wrote: > > > > > If you want to run (daily, rolling weekly, rolling monthly) backups on > a > > > daily basis, and they're mostly the same but have some additional > > > dependencies, you can write a DAG factory method, which you call three > > > times. Certain nodes only get added to the longer-than-daily backups. > > > > > > On Wed, Aug 8, 2018 at 2:03 PM Gabriel Silk <gs...@dropbox.com.invalid > > > > > wrote: > > > > > > > Thanks Andy and Taylor for the suggestions -- > > > > > > > > I see how that would work for the case where you want a weekly rollup > > > that > > > > runs on a weekly cadence. > > > > > > > > But what about a rolling weekly or monthly rollup that runs each day? > > > > > > > > On Wed, Aug 8, 2018 at 11:00 AM, Andy Cooper < > > andy.coo...@astronomer.io> > > > > wrote: > > > > > > > > > To expand on Taylor's idea > > > > > > > > > > I recently wrote a ScheduleBlackoutSensor that would allow you to > > > > prevent a > > > > > task from running if it meets the criteria provided. It accepts an > > > array > > > > of > > > > > args for any number of the criteria so you could leverage this > sensor > > > to > > > > > provide "blackout" runs for a range of days of the week. > > > > > > > > > > https://github.com/apache/incubator-airflow/pull/3702/files > > > > > > > > > > For example, > > > > > > > > > > task = ScheduleBlackoutSensor(day_of_week=[0,1,2,3,4,5], dag=dag) > > > > > > > > > > Would prevent a task from running Monday - Saturday, allowing it to > > run > > > > on > > > > > Sunday. > > > > > > > > > > You could leverage this Sensor as you would any other sensor or you > > > could > > > > > invert the logic so that you would only need to specify > > > > > > > > > > task = ScheduleBlackoutSensor(day_of_week=6, dag=dag) > > > > > > > > > > To "whitelist" a task to run on Sundays. > > > > > > > > > > > > > > > Let me know if you have any questions > > > > > > > > > > On Wed, Aug 8, 2018 at 1:47 PM Taylor Edmiston < > tedmis...@gmail.com> > > > > > wrote: > > > > > > > > > > > Gabriel - > > > > > > > > > > > > One approach I've seen for a similar use case is to have multiple > > > > related > > > > > > rollups in one DAG that runs daily, then have the non-daily tasks > > > skip > > > > > most > > > > > > of the time (e.g., weekly only actually executes on Sundays and > is > > > > > > parameterized to look at the last 7 days). > > > > > > > > > > > > You could implement that not running part a few ways, but one > idea > > > is a > > > > > > sensor in front of the weekly rollup task. Imagine a > SundaySensor > > > like > > > > > > return > > > > > > execution_date.weekday() == 6. One thing to keep in mind here is > > > > > > dependence on the DAG's cron schedule being more granular than > the > > > > tasks. > > > > > > > > > > > > I think this could generalize into a DayOfWeekSensor / > > > DayOfMonthSensor > > > > > > that would be nice to have. > > > > > > > > > > > > Of course this does mean some scheduler inefficiency on the skip > > > days, > > > > > but > > > > > > as long as those skips are fast and the overall number of tasks > is > > > > > small, I > > > > > > can accept that. > > > > > > > > > > > > *Taylor Edmiston* > > > > > > Blog <https://blog.tedmiston.com/> | CV > > > > > > <https://stackoverflow.com/cv/taylor> | LinkedIn > > > > > > <https://www.linkedin.com/in/tedmiston/> | AngelList > > > > > > <https://angel.co/taylor> | Stack Overflow > > > > > > <https://stackoverflow.com/users/149428/taylor-edmiston> > > > > > > > > > > > > > > > > > > On Wed, Aug 8, 2018 at 1:11 PM, Gabriel Silk > > > <gs...@dropbox.com.invalid > > > > > > > > > > > wrote: > > > > > > > > > > > > > Hello Airflow community, > > > > > > > > > > > > > > I have a basic question about how best to model a common data > > > > pipeline > > > > > > > pattern here at Dropbox. > > > > > > > > > > > > > > At Dropbox, all of our logs are ingested and written into Hive > in > > > > > hourly > > > > > > > and/or daily rollups. On top of this data we build many weekly > > and > > > > > > monthly > > > > > > > rollups, which typically run on a daily cadence and compute > > results > > > > > over > > > > > > a > > > > > > > rolling window. > > > > > > > > > > > > > > If we have a metric X, it seems natural to put the daily, > weekly, > > > and > > > > > > > monthly rollups for metric X all in the same DAG. > > > > > > > > > > > > > > However, the different rollups have different dependency > > > structures. > > > > > The > > > > > > > daily job only depends on a single day partition, whereas the > > > weekly > > > > > job > > > > > > > depends on 7, the monthly on 28. > > > > > > > > > > > > > > In Airflow, it seems the two paradigms for modeling > dependencies > > > are: > > > > > > > 1) Depend on a *single run of a task* within the same DAG > > > > > > > 2) Depend on *multiple runs of task* by using an > > ExternalTaskSensor > > > > > > > > > > > > > > I'm not sure how I could possibly model this scenario using > > > approach > > > > > #1, > > > > > > > and I'm not sure approach #2 is the most elegant or performant > > way > > > to > > > > > > model > > > > > > > this scenario. > > > > > > > > > > > > > > Any thoughts or suggestions? > > > > > > > > > > > > > > > > > > > > > > > > > > > >