I am also still in the "solve this in XCom" camp, but agree now that prefix
is an awkward solution and could be difficult / confusing to users.
I think there's no question that on retry this state / XCom should be wiped
(to preserve idempotency). I think in the cases that state is
valuable between retries this should be handled with in the operator (don't
actually fail triggering retry) or with some sort of custom model in the db
specific to that organizations implementation.
The two compelling reasons I have for keeping this in XCom are:
1) I think a new table for task state would be nearly identical to XCom
2) (bare with me here as this is a bit long and I am going to use the word
"task" informally denoted by quotes)
Let's consider what a reschedule operator is under the hood:
1. InitializeState "task" (e.g. start external job and record job_id)
2. n x UseState "task" (e.g. read job_id and poll for status) with the
caveat that if this fails we also want to retry InitializeState "task"
If we use this mental model, then we *are* using XCom for *inter*-"task"
communication, it's initial purpose (ergo, XCom still single purpose, if we
relax the formality of task == one operator in a DAG / entry in DB).
However, we are making a task (formal) that "does two things" (e.g. starts
/ waits for completion) though, many existing operators do this as it's
pragmatic (to get the desired retry behavior).
To make this thought exercise more concrete you can consider how one would
achieve this today
- SubDagOperator (which AFAIK still has the issue of blocking a worker
for the entire duration of the SubDag, but is pending improvements)
- InitializeStateOperator >(XCom job_id key)>
UseStateSensor(reschedule=True)
*Proposed Changes to XCom*- Add a state column (boolen or nullable string)
- If state column is false /null then clear XCom at beginning of task
(maintain current behavior)
- Else do not clear XCom (allowing task instance to read the state value
pushed by previous task instance)
I'd like to hear other thoughts about adding an extra column XCom and if it
should be is_stateful boolean or state_type string (nullable, but more
flexible for other use cases).
Another interesting question is should the rescheduled task instances
(pokes) be allowed to mutate this state? Not useful for polling for job
completion but might be useful for other kinds of rescheduled operations
after an initialized state.
Jarek,
As far as adding a state column to the TaskReschedule table, my
understanding is this table keeps track of task reschedules: it's first
entry is the first reschedule (not the first / originally scheduled task
instance) would the state value be duplicated for each reschedule or always
read from the first reschedule for a given task / dag / execution date?
Having the state associated w/ reschedules but not the original task
instance seems confusing to me (though technically possible to work with).
I suppose I'm just reiterating what Fokko said about XCom granularity
seeming more appropriate.
On Sat, Jan 11, 2020 at 3:05 PM Driesprong, Fokko <[email protected]>
wrote:
> Kaxil,
>
> I would not use a prefix, but a reserved key to have a single state field
> for the (dag, task, execution_date) tuple. When fetching the xcom values,
> we have to exclude the task that fetches these values.
>
> Daniel,
>
> Thanks for the scenario's.
>
> The first scenario could become idempotent, if you fetch everything up to
> the execution time. Also, if you have a modern data platform, fetching the
> watermark can be constant. I've also mentioned this on the Slack, but you
> can:
> - Keep statistics of the column in Hive
> - Fetch the max from the footers in the Parquet file, so you don't need to
> actual data
> - More recent formats, like Iceberg and Delta lake have the min/max of the
> column available as well, and this operation is even constant.
>
> > This value needs to be stored somewhere.
>
> This data is already available, but would you want to duplicate this in
> Airflow for optimization, my answer would be: no.
>
> Jarek,
>
> > I believe the whole idea of Airflow is to operate on fixed time
> intervals.
> > We always have fixed intervals and if we re-run an interval processing
> it's
> > always "all-or-nothing" for that interval. I.e. we do not store or care
> for
> > watermark. If we decide to re-process an interval of data, we always do
> it
> > for all the data for that interval at the source -> replacing the whole
> > "interval-related" data in the output. We are never supposed to process
> > incremental data. This is a very basic and fundamental assumption of
> > Airflow - that it operates on fixed ("batch") intervals. If you want to
> > process "streaming" data where you care for watermarks and "incremental"
> > processing you should use other systems - Apache Beam might be a good
> > choice for that for example.
>
> This is similar to how I see Airflow. For example reading from S3 using a
> templated path s3a://bucket/table/dt=2019-01-10/, and this replaces a
> single day partition in your favorite database.
>
> Daniel,
>
> > To banish anything stateful seems arbitrary and unnecessary. Airflow is
> > more than its canonical task structure: hook / operator framework and
> > ecosystem, scheduling, retry, alerting, distributed computing, etc etc
> etc
> > etc.
>
> I think this paragraph is trying to say that:
> https://github.com/apache/airflow#beyond-the-horizon
> Beside that, and I've already mentioned this earlier. I think there is
> place for state within Airflow, however the scenario's that you describe
> can also be done without keeping state. Of course you will sacrifice a bit
> of performance here. Having state makes things more complicated, and should
> only be used when there are no other options.
>
> Keeping something like a Job ID of your {Athena,BigQuery) job that you're
> tracking between async polls make sense to me. But something like the
> watermarks would not be my choice. Also, for committers we need to make
> sure, when there are stateful operators, we do the evolution of the state
> properly. In case you update your operator in Airflow, and you hit some old
> state that's written by a couple of versions back, it should still work.
>
> Cheers, Fokko
>
> Op za 11 jan. 2020 om 23:10 schreef Daniel Standish <[email protected]
> >:
>
> > To banish anything stateful seems arbitrary and unnecessary. Airflow is
> > more than its canonical task structure: hook / operator framework and
> > ecosystem, scheduling, retry, alerting, distributed computing, etc etc
> etc
> > etc.
> >
> > As long as support for the canonical task is preserved, what's the harm
> in
> > supporting stateful usage where it makes sense?
> >
> > Airflow may not have been designed initially to support incremental
> > processes. But it is a living thing, and as it happens, it can work well
> > for them.
> >
> > I think the two approaches can coexist harmoniously.
> >
> >
> >
> >
> > On Sat, Jan 11, 2020 at 1:33 PM Jarek Potiuk <[email protected]>
> > wrote:
> >
> > > Pandora's box it is indeed :)
> > >
> > > @Maxime Beauchemin <[email protected]> -> maybe you could
> > chime
> > > in here. I think of you still as the gatekeeper (or at least a Yoda
> > master)
> > > of the very basic ideas behind Apache Airflow, and I think your insight
> > > here would be really valuable.
> > >
> > > >
> > > > *Scenario 1: incremental data pull*
> > > > If you are incrementally pulling data from a database. Each time you
> > > only
> > > > want to pull the records that are modified. So you want to keep
> track
> > of
> > > > `datetime_modified` column.
> > > > Each run, you check the max modified date in source and store it.
> This
> > > is
> > > > your "high watermark" for this run. Next run, you pull from last
> high
> > > > watermark.
> > > > In a sense you can't really design this process to be idempotent: if
> > you
> > > > rerun the interval ('2019-12-01T10:00:00', '2019-12-01T11:00:00') you
> > > might
> > > > not get the same data (or any data at all) because in the source,
> those
> > >
> > > records may have been updated (with new modified date).
> > > >
> > >
> > > I believe the whole idea of Airflow is to operate on fixed time
> > intervals.
> > > We always have fixed intervals and if we re-run an interval processing
> > it's
> > > always "all-or-nothing" for that interval. I.e. we do not store or care
> > for
> > > watermark. If we decide to re-process an interval of data, we always do
> > it
> > > for all the data for that interval at the source -> replacing the whole
> > > "interval-related" data in the output. We are never supposed to process
> > > incremental data. This is a very basic and fundamental assumption of
> > > Airflow - that it operates on fixed ("batch") intervals. If you want to
> > > process "streaming" data where you care for watermarks and
> "incremental"
> > > processing you should use other systems - Apache Beam might be a good
> > > choice for that for example.
> > >
> > >
> > > > *Scenario 2: incremental dim / fact / reporting processes in
> database*
> > > > Suppose I am building a fact table. it has 10 source tables. I
> need
> > to
> > > > make this run incrementally. It's possible that there may be
> > differences
> > > > in update cadence in source tables. One way to approach this is in
> > each
> > > > run you calculate max modified in each source table, and take the min
> > of
> > > > all of them. That's your high watermark for this run. Next time,
> you
> > > have
> > > > to process data from there. This value needs to be stored somewhere.
> > > >
> > >
> > > Again - if you are not able to split the data into fixed intervals, and
> > > cannot afford re-processing of the whole interval of data rather than
> > > incremental processing, you should look for another solution. Airflow
> is
> > > not designed (and I think it should never do it) for
> > streaming/incremental
> > > data processing. It is designed to handle fixed-time batches of data.
> > > Airflow is not about optimising and processing as little data as
> > possible.
> > > It's all about processing fixed intervals fully so that the processing
> > can
> > > be as simple as possible - at the expense of sometimes processing the
> > same
> > > data again-and-again.
> > >
> > > *Scenario 3: dynamic date range / resuming large "initial load"
> > processes*
> > > > Pulling data from APIs, often we might want it to run daily or
> hourly.
> > > > Using backfill functionality is sometimes prohibitively slow because
> > you
> > > > have to carve up years of data into hourly or daily chunks. One
> > approach
> > > > is make a temporary `backfill_` job with modified schedule (e.g.
> > monthly)
> > > > and let that run from beginning of time (with catchup=True).
> > > > Alternatively you could instead design in stateful way. On initial
> > run,
> > > > pull from beginning of time. Thereafter, pull from last run time
> (and
> > > > maybe you need to do a lookback of 45 days or something, because data
> > in
> > > > source can change). Perhaps in your initial load you don't want to
> > pull
> > > by
> > > > day (too slow) but you also don't want to pull in one batch -- so you
> > > carve
> > > > up batches that are appropriate to the situation. And this is where
> > it's
> > > > helpful to have a state persistence mechanism: you can use this to
> > store
> > > > your progress on initial load, and in the event of failure, resume
> from
> > > > point of failure. Yes you _could_ parse it from s3 or wherever, but
> > > doing
> > > > so presents its own challenges and risks, and it is convenient to
> just
> > > > store it in the database -- and not necessarily any more problematic.
> > > >
> > >
> > > Same here - if your data source is not providing data in fixed
> > intervals, I
> > > think Apache Airflow might not be the best choice.
> > >
> > >
> > > >
> > > > *Scenario 4: no access*
> > > > As pointed out earlier, sometimes you might not have access to
> target.
> > > > E.g. i am pushing to someone elses s3 bucket and we only have
> PutObject
> > > but
> > > > can't read what's there. So we can't look at target to infer state.
> > > >
> > > > I'm sure there are other use cases out there. Anything "incremental"
> > > > implies a state.
> > > >
> > >
> > > That's the point I think that there might be a problem. Airflow is not
> > > designed to support incremental source of data. And trying to convert
> > > Airflow into such use case is probably not a good idea. Maybe it's just
> > the
> > > same as trying to use an axe to hammer a nail. It will work sometimes,
> > but
> > > maybe it's better to use a hammer instead.
> > >
> > > J.
> > >
> >
>
--
*Jacob Ferriero*
Strategic Cloud Engineer: Data Engineering
[email protected]
617-714-2509