There's also the hack of using templating to skip executions. Say for a BashOperator:
{% if execution_date.weekday() == 1 %} echo "skipping today" {% else %} ./run_workload.sh {% endif %} On Wed, Aug 8, 2018 at 4:27 PM Gabriel Silk <gs...@dropbox.com.invalid> wrote: > Alexis, do you mean you would have done this using an ExternalTaskSensor? > Or is there some other way to depend on a range of tasks? > > On Wed, Aug 8, 2018 at 3:35 PM, Alexis Rolland <alexis.roll...@ubisoft.com > > > wrote: > > > Not sure if it’s optimal compared to what James proposes, but I would > have > > simply made the weekly and monthly rollup tasks as downstream tasks of > the > > daily log ingestion tasks they depend on. Then I would have used trigger > > rules ‘all_done’ to ensure those rollup tasks start when their parent > tasks > > are completed. > > > > https://airflow.incubator.apache.org/concepts.html#trigger-rules > > > > (daily log ingestion) > (daily rollup) > > (daily log ingestion) > (weekly rollup + TriggerRule.all_done) > > (daily log ingestion) > (monthly rollup + TriggerRule.all_done) > > > > Cheers > > > > Alexis > > > > On 9 Aug 2018, at 02:57, James Meickle <jmeic...@quantopian.com. > > INVALID<mailto:jmeic...@quantopian.com.INVALID>> wrote: > > > > It sounds like you want something like this? > > > > root_operator = DummyOperator() > > > > def offset_operator(i): > > my_sql_query = "SELECT * FROM {{{{ds_add(execution_date, {offset}) > > }}}};".format(offset=i) > > sql_operator = SQLOperator(task_id="offset_by_{}".format(i)", > > query=my_sql_query) > > return sql_operator > > > > offset_operators = list(offset_operator(i) for i in range(7)) > > root_operator >> offset_operators > > > > # Daily just waits on today, no offset > > do_daily_work = DummyOperator() > > offset_operators[0] >> do_daily_work > > > > # Weekly waits on today AND the six prior offsets > > do_weekly_work = DummyOperator() > > offset_operators >> do_weekly_work > > > > IOW, every day you wait for that day's data to be available, and then run > > the daily job; you also wait for the previous six days data to be > > available, and when it is, run the weekly job. > > > > n.b. - if you do it this way you will have up to 7 tasks polling the > "same" > > data point, which is slightly wasteful. But it's also not much code or > > mental effort to write it this way. > > > > On Wed, Aug 8, 2018 at 2:44 PM Gabriel Silk <gs...@dropbox.com.invalid< > > mailto:gs...@dropbox.com.invalid>> > > wrote: > > > > My main concern is how to express the fact that the weekly rollup depends > > on the previous 7 days worth of data, and ensure that it does not run > until > > the tasks that generate those 7 days of data have run, assuming that > tasks > > can run non-sequentially. > > > > It's easy enough when you have the following situation: > > > > (daily log ingestion) <-- (daily rollup) > > > > In any given DAG run, you are guaranteed to have the data needed for > (daily > > rollup), because the dependency that generated its data just ran. > > > > But I'm not sure how best to model it when you have all of the following: > > > > (daily log ingestion) <-- (daily rollup) > > (daily log ingestion) <-- (weekly rollup) > > (daily log ingestion) <-- (monthly rollup) > > > > > > > > On Wed, Aug 8, 2018 at 11:29 AM, Taylor Edmiston <tedmis...@gmail.com > > <mailto:tedmis...@gmail.com>> > > wrote: > > > > Gabriel - > > > > Ah, I missed your earlier comment about weekly/monthly rollups also being > > on a daily cadence. So is your concern e.g., more about reducing the > > redundant process of the weekly rollup tasks for the days of that range > > that already processed in the previous DAG run(s)? Or mainly about the > > dependency of not executing the first weekly at all until the first 7 > > daily > > rollups worth of data have built up? > > > > *Taylor Edmiston* > > Blog <https://blog.tedmiston.com/> | CV > > <https://stackoverflow.com/cv/taylor> | LinkedIn > > <https://www.linkedin.com/in/tedmiston/> | AngelList > > <https://angel.co/taylor> | Stack Overflow > > <https://stackoverflow.com/users/149428/taylor-edmiston> > > > > > > On Wed, Aug 8, 2018 at 2:14 PM, James Meickle <jmeic...@quantopian.com< > > mailto:jmeic...@quantopian.com>. > > invalid> wrote: > > > > If you want to run (daily, rolling weekly, rolling monthly) backups on > > a > > daily basis, and they're mostly the same but have some additional > > dependencies, you can write a DAG factory method, which you call three > > times. Certain nodes only get added to the longer-than-daily backups. > > > > On Wed, Aug 8, 2018 at 2:03 PM Gabriel Silk <gs...@dropbox.com.invalid< > > mailto:gs...@dropbox.com.invalid> > > > > wrote: > > > > Thanks Andy and Taylor for the suggestions -- > > > > I see how that would work for the case where you want a weekly rollup > > that > > runs on a weekly cadence. > > > > But what about a rolling weekly or monthly rollup that runs each day? > > > > On Wed, Aug 8, 2018 at 11:00 AM, Andy Cooper < > > andy.coo...@astronomer.io<mailto:andy.coo...@astronomer.io>> > > wrote: > > > > To expand on Taylor's idea > > > > I recently wrote a ScheduleBlackoutSensor that would allow you to > > prevent a > > task from running if it meets the criteria provided. It accepts an > > array > > of > > args for any number of the criteria so you could leverage this > > sensor > > to > > provide "blackout" runs for a range of days of the week. > > > > https://github.com/apache/incubator-airflow/pull/3702/files > > > > For example, > > > > task = ScheduleBlackoutSensor(day_of_week=[0,1,2,3,4,5], dag=dag) > > > > Would prevent a task from running Monday - Saturday, allowing it to > > run > > on > > Sunday. > > > > You could leverage this Sensor as you would any other sensor or you > > could > > invert the logic so that you would only need to specify > > > > task = ScheduleBlackoutSensor(day_of_week=6, dag=dag) > > > > To "whitelist" a task to run on Sundays. > > > > > > Let me know if you have any questions > > > > On Wed, Aug 8, 2018 at 1:47 PM Taylor Edmiston < > > tedmis...@gmail.com<mailto:tedmis...@gmail.com>> > > wrote: > > > > Gabriel - > > > > One approach I've seen for a similar use case is to have multiple > > related > > rollups in one DAG that runs daily, then have the non-daily tasks > > skip > > most > > of the time (e.g., weekly only actually executes on Sundays and > > is > > parameterized to look at the last 7 days). > > > > You could implement that not running part a few ways, but one > > idea > > is a > > sensor in front of the weekly rollup task. Imagine a > > SundaySensor > > like > > return > > execution_date.weekday() == 6. One thing to keep in mind here is > > dependence on the DAG's cron schedule being more granular than > > the > > tasks. > > > > I think this could generalize into a DayOfWeekSensor / > > DayOfMonthSensor > > that would be nice to have. > > > > Of course this does mean some scheduler inefficiency on the skip > > days, > > but > > as long as those skips are fast and the overall number of tasks > > is > > small, I > > can accept that. > > > > *Taylor Edmiston* > > Blog <https://blog.tedmiston.com/> | CV > > <https://stackoverflow.com/cv/taylor> | LinkedIn > > <https://www.linkedin.com/in/tedmiston/> | AngelList > > <https://angel.co/taylor> | Stack Overflow > > <https://stackoverflow.com/users/149428/taylor-edmiston> > > > > > > On Wed, Aug 8, 2018 at 1:11 PM, Gabriel Silk > > <gs...@dropbox.com.invalid<mailto:gs...@dropbox.com.invalid> > > > > wrote: > > > > Hello Airflow community, > > > > I have a basic question about how best to model a common data > > pipeline > > pattern here at Dropbox. > > > > At Dropbox, all of our logs are ingested and written into Hive > > in > > hourly > > and/or daily rollups. On top of this data we build many weekly > > and > > monthly > > rollups, which typically run on a daily cadence and compute > > results > > over > > a > > rolling window. > > > > If we have a metric X, it seems natural to put the daily, > > weekly, > > and > > monthly rollups for metric X all in the same DAG. > > > > However, the different rollups have different dependency > > structures. > > The > > daily job only depends on a single day partition, whereas the > > weekly > > job > > depends on 7, the monthly on 28. > > > > In Airflow, it seems the two paradigms for modeling > > dependencies > > are: > > 1) Depend on a *single run of a task* within the same DAG > > 2) Depend on *multiple runs of task* by using an > > ExternalTaskSensor > > > > I'm not sure how I could possibly model this scenario using > > approach > > #1, > > and I'm not sure approach #2 is the most elegant or performant > > way > > to > > model > > this scenario. > > > > Any thoughts or suggestions? > > > > > > > > > > > > > > > > >