Not sure if it’s optimal compared to what James proposes, but I would have simply made the weekly and monthly rollup tasks as downstream tasks of the daily log ingestion tasks they depend on. Then I would have used trigger rules ‘all_done’ to ensure those rollup tasks start when their parent tasks are completed.
https://airflow.incubator.apache.org/concepts.html#trigger-rules (daily log ingestion) > (daily rollup) (daily log ingestion) > (weekly rollup + TriggerRule.all_done) (daily log ingestion) > (monthly rollup + TriggerRule.all_done) Cheers Alexis On 9 Aug 2018, at 02:57, James Meickle <jmeic...@quantopian.com.INVALID<mailto:jmeic...@quantopian.com.INVALID>> wrote: It sounds like you want something like this? root_operator = DummyOperator() def offset_operator(i): my_sql_query = "SELECT * FROM {{{{ds_add(execution_date, {offset}) }}}};".format(offset=i) sql_operator = SQLOperator(task_id="offset_by_{}".format(i)", query=my_sql_query) return sql_operator offset_operators = list(offset_operator(i) for i in range(7)) root_operator >> offset_operators # Daily just waits on today, no offset do_daily_work = DummyOperator() offset_operators[0] >> do_daily_work # Weekly waits on today AND the six prior offsets do_weekly_work = DummyOperator() offset_operators >> do_weekly_work IOW, every day you wait for that day's data to be available, and then run the daily job; you also wait for the previous six days data to be available, and when it is, run the weekly job. n.b. - if you do it this way you will have up to 7 tasks polling the "same" data point, which is slightly wasteful. But it's also not much code or mental effort to write it this way. On Wed, Aug 8, 2018 at 2:44 PM Gabriel Silk <gs...@dropbox.com.invalid<mailto:gs...@dropbox.com.invalid>> wrote: My main concern is how to express the fact that the weekly rollup depends on the previous 7 days worth of data, and ensure that it does not run until the tasks that generate those 7 days of data have run, assuming that tasks can run non-sequentially. It's easy enough when you have the following situation: (daily log ingestion) <-- (daily rollup) In any given DAG run, you are guaranteed to have the data needed for (daily rollup), because the dependency that generated its data just ran. But I'm not sure how best to model it when you have all of the following: (daily log ingestion) <-- (daily rollup) (daily log ingestion) <-- (weekly rollup) (daily log ingestion) <-- (monthly rollup) On Wed, Aug 8, 2018 at 11:29 AM, Taylor Edmiston <tedmis...@gmail.com<mailto:tedmis...@gmail.com>> wrote: Gabriel - Ah, I missed your earlier comment about weekly/monthly rollups also being on a daily cadence. So is your concern e.g., more about reducing the redundant process of the weekly rollup tasks for the days of that range that already processed in the previous DAG run(s)? Or mainly about the dependency of not executing the first weekly at all until the first 7 daily rollups worth of data have built up? *Taylor Edmiston* Blog <https://blog.tedmiston.com/> | CV <https://stackoverflow.com/cv/taylor> | LinkedIn <https://www.linkedin.com/in/tedmiston/> | AngelList <https://angel.co/taylor> | Stack Overflow <https://stackoverflow.com/users/149428/taylor-edmiston> On Wed, Aug 8, 2018 at 2:14 PM, James Meickle <jmeic...@quantopian.com<mailto:jmeic...@quantopian.com>. invalid> wrote: If you want to run (daily, rolling weekly, rolling monthly) backups on a daily basis, and they're mostly the same but have some additional dependencies, you can write a DAG factory method, which you call three times. Certain nodes only get added to the longer-than-daily backups. On Wed, Aug 8, 2018 at 2:03 PM Gabriel Silk <gs...@dropbox.com.invalid<mailto:gs...@dropbox.com.invalid> wrote: Thanks Andy and Taylor for the suggestions -- I see how that would work for the case where you want a weekly rollup that runs on a weekly cadence. But what about a rolling weekly or monthly rollup that runs each day? On Wed, Aug 8, 2018 at 11:00 AM, Andy Cooper < andy.coo...@astronomer.io<mailto:andy.coo...@astronomer.io>> wrote: To expand on Taylor's idea I recently wrote a ScheduleBlackoutSensor that would allow you to prevent a task from running if it meets the criteria provided. It accepts an array of args for any number of the criteria so you could leverage this sensor to provide "blackout" runs for a range of days of the week. https://github.com/apache/incubator-airflow/pull/3702/files For example, task = ScheduleBlackoutSensor(day_of_week=[0,1,2,3,4,5], dag=dag) Would prevent a task from running Monday - Saturday, allowing it to run on Sunday. You could leverage this Sensor as you would any other sensor or you could invert the logic so that you would only need to specify task = ScheduleBlackoutSensor(day_of_week=6, dag=dag) To "whitelist" a task to run on Sundays. Let me know if you have any questions On Wed, Aug 8, 2018 at 1:47 PM Taylor Edmiston < tedmis...@gmail.com<mailto:tedmis...@gmail.com>> wrote: Gabriel - One approach I've seen for a similar use case is to have multiple related rollups in one DAG that runs daily, then have the non-daily tasks skip most of the time (e.g., weekly only actually executes on Sundays and is parameterized to look at the last 7 days). You could implement that not running part a few ways, but one idea is a sensor in front of the weekly rollup task. Imagine a SundaySensor like return execution_date.weekday() == 6. One thing to keep in mind here is dependence on the DAG's cron schedule being more granular than the tasks. I think this could generalize into a DayOfWeekSensor / DayOfMonthSensor that would be nice to have. Of course this does mean some scheduler inefficiency on the skip days, but as long as those skips are fast and the overall number of tasks is small, I can accept that. *Taylor Edmiston* Blog <https://blog.tedmiston.com/> | CV <https://stackoverflow.com/cv/taylor> | LinkedIn <https://www.linkedin.com/in/tedmiston/> | AngelList <https://angel.co/taylor> | Stack Overflow <https://stackoverflow.com/users/149428/taylor-edmiston> On Wed, Aug 8, 2018 at 1:11 PM, Gabriel Silk <gs...@dropbox.com.invalid<mailto:gs...@dropbox.com.invalid> wrote: Hello Airflow community, I have a basic question about how best to model a common data pipeline pattern here at Dropbox. At Dropbox, all of our logs are ingested and written into Hive in hourly and/or daily rollups. On top of this data we build many weekly and monthly rollups, which typically run on a daily cadence and compute results over a rolling window. If we have a metric X, it seems natural to put the daily, weekly, and monthly rollups for metric X all in the same DAG. However, the different rollups have different dependency structures. The daily job only depends on a single day partition, whereas the weekly job depends on 7, the monthly on 28. In Airflow, it seems the two paradigms for modeling dependencies are: 1) Depend on a *single run of a task* within the same DAG 2) Depend on *multiple runs of task* by using an ExternalTaskSensor I'm not sure how I could possibly model this scenario using approach #1, and I'm not sure approach #2 is the most elegant or performant way to model this scenario. Any thoughts or suggestions?