Kaxil, I would not use a prefix, but a reserved key to have a single state field for the (dag, task, execution_date) tuple. When fetching the xcom values, we have to exclude the task that fetches these values.
Daniel, Thanks for the scenario's. The first scenario could become idempotent, if you fetch everything up to the execution time. Also, if you have a modern data platform, fetching the watermark can be constant. I've also mentioned this on the Slack, but you can: - Keep statistics of the column in Hive - Fetch the max from the footers in the Parquet file, so you don't need to actual data - More recent formats, like Iceberg and Delta lake have the min/max of the column available as well, and this operation is even constant. > This value needs to be stored somewhere. This data is already available, but would you want to duplicate this in Airflow for optimization, my answer would be: no. Jarek, > I believe the whole idea of Airflow is to operate on fixed time intervals. > We always have fixed intervals and if we re-run an interval processing it's > always "all-or-nothing" for that interval. I.e. we do not store or care for > watermark. If we decide to re-process an interval of data, we always do it > for all the data for that interval at the source -> replacing the whole > "interval-related" data in the output. We are never supposed to process > incremental data. This is a very basic and fundamental assumption of > Airflow - that it operates on fixed ("batch") intervals. If you want to > process "streaming" data where you care for watermarks and "incremental" > processing you should use other systems - Apache Beam might be a good > choice for that for example. This is similar to how I see Airflow. For example reading from S3 using a templated path s3a://bucket/table/dt=2019-01-10/, and this replaces a single day partition in your favorite database. Daniel, > To banish anything stateful seems arbitrary and unnecessary. Airflow is > more than its canonical task structure: hook / operator framework and > ecosystem, scheduling, retry, alerting, distributed computing, etc etc etc > etc. I think this paragraph is trying to say that: https://github.com/apache/airflow#beyond-the-horizon Beside that, and I've already mentioned this earlier. I think there is place for state within Airflow, however the scenario's that you describe can also be done without keeping state. Of course you will sacrifice a bit of performance here. Having state makes things more complicated, and should only be used when there are no other options. Keeping something like a Job ID of your {Athena,BigQuery) job that you're tracking between async polls make sense to me. But something like the watermarks would not be my choice. Also, for committers we need to make sure, when there are stateful operators, we do the evolution of the state properly. In case you update your operator in Airflow, and you hit some old state that's written by a couple of versions back, it should still work. Cheers, Fokko Op za 11 jan. 2020 om 23:10 schreef Daniel Standish <dpstand...@gmail.com>: > To banish anything stateful seems arbitrary and unnecessary. Airflow is > more than its canonical task structure: hook / operator framework and > ecosystem, scheduling, retry, alerting, distributed computing, etc etc etc > etc. > > As long as support for the canonical task is preserved, what's the harm in > supporting stateful usage where it makes sense? > > Airflow may not have been designed initially to support incremental > processes. But it is a living thing, and as it happens, it can work well > for them. > > I think the two approaches can coexist harmoniously. > > > > > On Sat, Jan 11, 2020 at 1:33 PM Jarek Potiuk <jarek.pot...@polidea.com> > wrote: > > > Pandora's box it is indeed :) > > > > @Maxime Beauchemin <maximebeauche...@gmail.com> -> maybe you could > chime > > in here. I think of you still as the gatekeeper (or at least a Yoda > master) > > of the very basic ideas behind Apache Airflow, and I think your insight > > here would be really valuable. > > > > > > > > *Scenario 1: incremental data pull* > > > If you are incrementally pulling data from a database. Each time you > > only > > > want to pull the records that are modified. So you want to keep track > of > > > `datetime_modified` column. > > > Each run, you check the max modified date in source and store it. This > > is > > > your "high watermark" for this run. Next run, you pull from last high > > > watermark. > > > In a sense you can't really design this process to be idempotent: if > you > > > rerun the interval ('2019-12-01T10:00:00', '2019-12-01T11:00:00') you > > might > > > not get the same data (or any data at all) because in the source, those > > > > records may have been updated (with new modified date). > > > > > > > I believe the whole idea of Airflow is to operate on fixed time > intervals. > > We always have fixed intervals and if we re-run an interval processing > it's > > always "all-or-nothing" for that interval. I.e. we do not store or care > for > > watermark. If we decide to re-process an interval of data, we always do > it > > for all the data for that interval at the source -> replacing the whole > > "interval-related" data in the output. We are never supposed to process > > incremental data. This is a very basic and fundamental assumption of > > Airflow - that it operates on fixed ("batch") intervals. If you want to > > process "streaming" data where you care for watermarks and "incremental" > > processing you should use other systems - Apache Beam might be a good > > choice for that for example. > > > > > > > *Scenario 2: incremental dim / fact / reporting processes in database* > > > Suppose I am building a fact table. it has 10 source tables. I need > to > > > make this run incrementally. It's possible that there may be > differences > > > in update cadence in source tables. One way to approach this is in > each > > > run you calculate max modified in each source table, and take the min > of > > > all of them. That's your high watermark for this run. Next time, you > > have > > > to process data from there. This value needs to be stored somewhere. > > > > > > > Again - if you are not able to split the data into fixed intervals, and > > cannot afford re-processing of the whole interval of data rather than > > incremental processing, you should look for another solution. Airflow is > > not designed (and I think it should never do it) for > streaming/incremental > > data processing. It is designed to handle fixed-time batches of data. > > Airflow is not about optimising and processing as little data as > possible. > > It's all about processing fixed intervals fully so that the processing > can > > be as simple as possible - at the expense of sometimes processing the > same > > data again-and-again. > > > > *Scenario 3: dynamic date range / resuming large "initial load" > processes* > > > Pulling data from APIs, often we might want it to run daily or hourly. > > > Using backfill functionality is sometimes prohibitively slow because > you > > > have to carve up years of data into hourly or daily chunks. One > approach > > > is make a temporary `backfill_` job with modified schedule (e.g. > monthly) > > > and let that run from beginning of time (with catchup=True). > > > Alternatively you could instead design in stateful way. On initial > run, > > > pull from beginning of time. Thereafter, pull from last run time (and > > > maybe you need to do a lookback of 45 days or something, because data > in > > > source can change). Perhaps in your initial load you don't want to > pull > > by > > > day (too slow) but you also don't want to pull in one batch -- so you > > carve > > > up batches that are appropriate to the situation. And this is where > it's > > > helpful to have a state persistence mechanism: you can use this to > store > > > your progress on initial load, and in the event of failure, resume from > > > point of failure. Yes you _could_ parse it from s3 or wherever, but > > doing > > > so presents its own challenges and risks, and it is convenient to just > > > store it in the database -- and not necessarily any more problematic. > > > > > > > Same here - if your data source is not providing data in fixed > intervals, I > > think Apache Airflow might not be the best choice. > > > > > > > > > > *Scenario 4: no access* > > > As pointed out earlier, sometimes you might not have access to target. > > > E.g. i am pushing to someone elses s3 bucket and we only have PutObject > > but > > > can't read what's there. So we can't look at target to infer state. > > > > > > I'm sure there are other use cases out there. Anything "incremental" > > > implies a state. > > > > > > > That's the point I think that there might be a problem. Airflow is not > > designed to support incremental source of data. And trying to convert > > Airflow into such use case is probably not a good idea. Maybe it's just > the > > same as trying to use an axe to hammer a nail. It will work sometimes, > but > > maybe it's better to use a hammer instead. > > > > J. > > >