It sounds like you want something like this?

root_operator = DummyOperator()

def offset_operator(i):
  my_sql_query = "SELECT * FROM {{{{ds_add(execution_date, {offset})
}}}};".format(offset=i)
  sql_operator = SQLOperator(task_id="offset_by_{}".format(i)",
query=my_sql_query)
  return sql_operator

offset_operators = list(offset_operator(i) for i in range(7))
root_operator >> offset_operators

# Daily just waits on today, no offset
do_daily_work = DummyOperator()
offset_operators[0] >> do_daily_work

# Weekly waits on today AND the six prior offsets
do_weekly_work = DummyOperator()
offset_operators >> do_weekly_work

IOW, every day you wait for that day's data to be available, and then run
the daily job; you also wait for the previous six days data to be
available, and when it is, run the weekly job.

n.b. - if you do it this way you will have up to 7 tasks polling the "same"
data point, which is slightly wasteful. But it's also not much code or
mental effort to write it this way.

On Wed, Aug 8, 2018 at 2:44 PM Gabriel Silk <gs...@dropbox.com.invalid>
wrote:

> My main concern is how to express the fact that the weekly rollup depends
> on the previous 7 days worth of data, and ensure that it does not run until
> the tasks that generate those 7 days of data have run, assuming that tasks
> can run non-sequentially.
>
> It's easy enough when you have the following situation:
>
> (daily log ingestion) <-- (daily rollup)
>
> In any given DAG run, you are guaranteed to have the data needed for (daily
> rollup), because the dependency that generated its data just ran.
>
> But I'm not sure how best to model it when you have all of the following:
>
> (daily log ingestion) <-- (daily rollup)
> (daily log ingestion) <-- (weekly rollup)
> (daily log ingestion) <-- (monthly rollup)
>
>
>
> On Wed, Aug 8, 2018 at 11:29 AM, Taylor Edmiston <tedmis...@gmail.com>
> wrote:
>
> > Gabriel -
> >
> > Ah, I missed your earlier comment about weekly/monthly rollups also being
> > on a daily cadence.  So is your concern e.g., more about reducing the
> > redundant process of the weekly rollup tasks for the days of that range
> > that already processed in the previous DAG run(s)?  Or mainly about the
> > dependency of not executing the first weekly at all until the first 7
> daily
> > rollups worth of data have built up?
> >
> > *Taylor Edmiston*
> > Blog <https://blog.tedmiston.com/> | CV
> > <https://stackoverflow.com/cv/taylor> | LinkedIn
> > <https://www.linkedin.com/in/tedmiston/> | AngelList
> > <https://angel.co/taylor> | Stack Overflow
> > <https://stackoverflow.com/users/149428/taylor-edmiston>
> >
> >
> > On Wed, Aug 8, 2018 at 2:14 PM, James Meickle <jmeic...@quantopian.com.
> > invalid> wrote:
> >
> > > If you want to run (daily, rolling weekly, rolling monthly) backups on
> a
> > > daily basis, and they're mostly the same but have some additional
> > > dependencies, you can write a DAG factory method, which you call three
> > > times. Certain nodes only get added to the longer-than-daily backups.
> > >
> > > On Wed, Aug 8, 2018 at 2:03 PM Gabriel Silk <gs...@dropbox.com.invalid
> >
> > > wrote:
> > >
> > > > Thanks Andy and Taylor for the suggestions --
> > > >
> > > > I see how that would work for the case where you want a weekly rollup
> > > that
> > > > runs on a weekly cadence.
> > > >
> > > > But what about a rolling weekly or monthly rollup that runs each day?
> > > >
> > > > On Wed, Aug 8, 2018 at 11:00 AM, Andy Cooper <
> > andy.coo...@astronomer.io>
> > > > wrote:
> > > >
> > > > > To expand on Taylor's idea
> > > > >
> > > > > I recently wrote a ScheduleBlackoutSensor that would allow you to
> > > > prevent a
> > > > > task from running if it meets the criteria provided. It accepts an
> > > array
> > > > of
> > > > > args for any number of the criteria so you could leverage this
> sensor
> > > to
> > > > > provide "blackout" runs for a range of days of the week.
> > > > >
> > > > > https://github.com/apache/incubator-airflow/pull/3702/files
> > > > >
> > > > > For example,
> > > > >
> > > > > task = ScheduleBlackoutSensor(day_of_week=[0,1,2,3,4,5], dag=dag)
> > > > >
> > > > > Would prevent a task from running Monday - Saturday, allowing it to
> > run
> > > > on
> > > > > Sunday.
> > > > >
> > > > > You could leverage this Sensor as you would any other sensor or you
> > > could
> > > > > invert the logic so that you would only need to specify
> > > > >
> > > > > task = ScheduleBlackoutSensor(day_of_week=6, dag=dag)
> > > > >
> > > > > To "whitelist" a task to run on Sundays.
> > > > >
> > > > >
> > > > > Let me know if you have any questions
> > > > >
> > > > > On Wed, Aug 8, 2018 at 1:47 PM Taylor Edmiston <
> tedmis...@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Gabriel -
> > > > > >
> > > > > > One approach I've seen for a similar use case is to have multiple
> > > > related
> > > > > > rollups in one DAG that runs daily, then have the non-daily tasks
> > > skip
> > > > > most
> > > > > > of the time (e.g., weekly only actually executes on Sundays and
> is
> > > > > > parameterized to look at the last 7 days).
> > > > > >
> > > > > > You could implement that not running part a few ways, but one
> idea
> > > is a
> > > > > > sensor in front of the weekly rollup task.  Imagine a
> SundaySensor
> > > like
> > > > > > return
> > > > > > execution_date.weekday() == 6.  One thing to keep in mind here is
> > > > > > dependence on the DAG's cron schedule being more granular than
> the
> > > > tasks.
> > > > > >
> > > > > > I think this could generalize into a DayOfWeekSensor /
> > > DayOfMonthSensor
> > > > > > that would be nice to have.
> > > > > >
> > > > > > Of course this does mean some scheduler inefficiency on the skip
> > > days,
> > > > > but
> > > > > > as long as those skips are fast and the overall number of tasks
> is
> > > > > small, I
> > > > > > can accept that.
> > > > > >
> > > > > > *Taylor Edmiston*
> > > > > > Blog <https://blog.tedmiston.com/> | CV
> > > > > > <https://stackoverflow.com/cv/taylor> | LinkedIn
> > > > > > <https://www.linkedin.com/in/tedmiston/> | AngelList
> > > > > > <https://angel.co/taylor> | Stack Overflow
> > > > > > <https://stackoverflow.com/users/149428/taylor-edmiston>
> > > > > >
> > > > > >
> > > > > > On Wed, Aug 8, 2018 at 1:11 PM, Gabriel Silk
> > > <gs...@dropbox.com.invalid
> > > > >
> > > > > > wrote:
> > > > > >
> > > > > > > Hello Airflow community,
> > > > > > >
> > > > > > > I have a basic question about how best to model a common data
> > > > pipeline
> > > > > > > pattern here at Dropbox.
> > > > > > >
> > > > > > > At Dropbox, all of our logs are ingested and written into Hive
> in
> > > > > hourly
> > > > > > > and/or daily rollups. On top of this data we build many weekly
> > and
> > > > > > monthly
> > > > > > > rollups, which typically run on a daily cadence and compute
> > results
> > > > > over
> > > > > > a
> > > > > > > rolling window.
> > > > > > >
> > > > > > > If we have a metric X, it seems natural to put the daily,
> weekly,
> > > and
> > > > > > > monthly rollups for metric X all in the same DAG.
> > > > > > >
> > > > > > > However, the different rollups have different dependency
> > > structures.
> > > > > The
> > > > > > > daily job only depends on a single day partition, whereas the
> > > weekly
> > > > > job
> > > > > > > depends on 7, the monthly on 28.
> > > > > > >
> > > > > > > In Airflow, it seems the two paradigms for modeling
> dependencies
> > > are:
> > > > > > > 1) Depend on a *single run of a task* within the same DAG
> > > > > > > 2) Depend on *multiple runs of task* by using an
> > ExternalTaskSensor
> > > > > > >
> > > > > > > I'm not sure how I could possibly model this scenario using
> > > approach
> > > > > #1,
> > > > > > > and I'm not sure approach #2 is the most elegant or performant
> > way
> > > to
> > > > > > model
> > > > > > > this scenario.
> > > > > > >
> > > > > > > Any thoughts or suggestions?
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to