+1 (binding) On Wed, Jun 1, 2022 at 12:34 PM Ash Berlin-Taylor <[email protected]> wrote:
> Hi All, > > Now that Summit is over (well done all the speakers! The talks I've caught > so far have been great) I'm ready to push forward with Data Driven > Scheduling, and I would like to call for a vote on > https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-48+Data+Dependency+Management+and+Data+Driven+Scheduling > > The vote for last for 7 days, until 2022/06/07 at 16:30 UTC. > > (This is my +1 vote) > > I have just published updates to the AIP, hopefully to make the AIP > tighter in scope (and easier to implement too). The tl;dr of this AIP: > > - Add a concept of Dataset (which is a uri-parsable str. Airflow places no > meaning on what the URI contains/means/is - "airflow:" scheme is reserved) > - A task "produces" a dataset by a) Having it in it's outlets attribute, > and b) finishing with SUCCESS. (That is, Airflow doesn't know/care about > data transfer/SQL tables etc. It is just conceptually) > - A DAG says that it wants to be triggered when it's dataset (or any of > it's datasets) change. When this happens the scheduler will create the dag > run. > > This is just a high level summary, please read the confluence page for > full details. > > We have already thought about lots of ways we can (and will) extend this > in the over time, detailed in the "Future work" section. Our goal with this > AIP is to build the kernel of Data-aware Scheduling that we can build on > over time. > > A teaser/example DAG that hopefully gives a clue as to what we are talking > about here: > > ``` > import pandas as pd > > from airflow import dag, Dataset > > > dataset = Dataset("s3://s3_default@some_bucket/order_data") > @dag > def my_dag(): > > @dag.task(outlets=[dataset]) > def producer(): > # What this task actually does doesn't matter to Airflow, the > simple act of running to SUCCESS means the dataset > # is updated, and downstream dags will get triggered > ... > > > > dataset = Dataset("s3://s3_default@some_bucket/order_data") > @dag(schedule_on=dataset) > def consuming_dag(): > @dag.task > def consumer(uri): > df = pandas.read_from_s3(uri) > print(f" Dataset had {df.count()} rows") > > consumer(df=ref.uri) > ``` > > If anyone has any changes you think are fundamental/foundational to the > core idea you have 1 week to raise it :) (Names of parameters we can easily > change as we implement this) Our desire is to get this written and released > Airflow 2.4. > > Thanks, > Ash > >
