+1 (binding)

On Wed, Jun 1, 2022 at 12:34 PM Ash Berlin-Taylor <[email protected]> wrote:

> Hi All,
>
> Now that Summit is over (well done all the speakers! The talks I've caught
> so far have been great) I'm ready to push forward with Data Driven
> Scheduling, and I would like to call for a vote on
> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-48+Data+Dependency+Management+and+Data+Driven+Scheduling
>
> The vote for last for 7 days, until 2022/06/07 at 16:30 UTC.
>
> (This is my +1 vote)
>
> I have just published updates to the AIP, hopefully to make the AIP
> tighter in scope (and easier to implement too). The tl;dr of this AIP:
>
> - Add a concept of Dataset (which is a uri-parsable str. Airflow places no
> meaning on what the URI contains/means/is - "airflow:" scheme is reserved)
> - A task "produces" a dataset by a) Having it in it's outlets attribute,
> and b) finishing with SUCCESS. (That is, Airflow doesn't  know/care about
> data transfer/SQL tables etc. It is just conceptually)
> - A DAG says that it wants to be triggered when it's dataset (or any of
> it's datasets) change. When this happens the scheduler will create the dag
> run.
>
> This is just a high level summary, please read the confluence page for
> full details.
>
> We have already thought about lots of ways we can (and will) extend this
> in the over time, detailed in the "Future work" section. Our goal with this
> AIP is to build the kernel of Data-aware Scheduling that we can build on
> over time.
>
> A teaser/example DAG that hopefully gives a clue as to what we are talking
> about here:
>
> ```
> import pandas as pd
>
> from airflow import dag, Dataset
>
>
> dataset = Dataset("s3://s3_default@some_bucket/order_data")
> @dag
> def my_dag():
>
>     @dag.task(outlets=[dataset])
>     def producer():
>         # What this task actually does doesn't matter to Airflow, the
> simple act of running to SUCCESS means the dataset
>         # is updated, and downstream dags will get triggered
>         ...
>
>
>
> dataset = Dataset("s3://s3_default@some_bucket/order_data")
> @dag(schedule_on=dataset)
> def consuming_dag():
>     @dag.task
>     def consumer(uri):
>         df = pandas.read_from_s3(uri)
>         print(f" Dataset had {df.count()} rows")
>
>     consumer(df=ref.uri)
> ```
>
> If anyone has any changes you think are fundamental/foundational to the
> core idea you have 1 week to raise it :) (Names of parameters we can easily
> change as we implement this) Our desire is to get this written and released
> Airflow 2.4.
>
> Thanks,
> Ash
>
>

Reply via email to