This vote is now passed with:
7 x +1 binding votes (Jed, Jarek, Vikram, Ephraim, Kaxil, Elad, Brent)
and 5 non-binding (Drew, Pankaj, Dennis, Phani, Pierre)
0x -1 votes
We'll start work on this shortly (first step is to create a milestone
and create initial tickets.)
Cheers,
Ash
On Wed, Jun 1 2022 at 17:34:13 +0100, Ash Berlin-Taylor
<[email protected]> wrote:
Hi All,
Now that Summit is over (well done all the speakers! The talks I've
caught so far have been great) I'm ready to push forward with Data
Driven Scheduling, and I would like to call for a vote on
<https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-48+Data+Dependency+Management+and+Data+Driven+Scheduling>
The vote for last for 7 days, until 2022/06/07 at 16:30 UTC.
(This is my +1 vote)
I have just published updates to the AIP, hopefully to make the AIP
tighter in scope (and easier to implement too). The tl;dr of this AIP:
- Add a concept of Dataset (which is a uri-parsable str. Airflow
places no meaning on what the URI contains/means/is - "airflow:"
scheme is reserved)
- A task "produces" a dataset by a) Having it in it's outlets
attribute, and b) finishing with SUCCESS. (That is, Airflow doesn't
know/care about data transfer/SQL tables etc. It is just conceptually)
- A DAG says that it wants to be triggered when it's dataset (or any
of it's datasets) change. When this happens the scheduler will create
the dag run.
This is just a high level summary, please read the confluence page
for full details.
We have already thought about lots of ways we can (and will) extend
this in the over time, detailed in the "Future work" section. Our
goal with this AIP is to build the kernel of Data-aware Scheduling
that we can build on over time.
A teaser/example DAG that hopefully gives a clue as to what we are
talking about here:
```
import pandas as pd
from airflowimport dag, Dataset
dataset= Dataset("s3://s3_default@some_bucket/order_data")
@dag
def my_dag():
@dag.task(outlets=[dataset])
def producer():
# What this task actually does doesn't matter to Airflow, the
simple act of running to SUCCESS means the dataset
# is updated, and downstream dags will get triggered
...
dataset= Dataset("s3://s3_default@some_bucket/order_data")
@dag(schedule_on=dataset)
def consuming_dag():
@dag.task
def consumer(uri):
df= pandas.read_from_s3(uri)
print(f" Dataset had {df.count()} rows")
consumer(df=ref.uri)
```
If anyone has any changes you think are fundamental/foundational to
the core idea you have 1 week to raise it :) (Names of parameters we
can easily change as we implement this) Our desire is to get this
written and released Airflow 2.4.
Thanks,
Ash