Hello everyone!
I'd like to start a discussion about a new feature that we've been thinking
about for a while, and that Ash mentioned in his Airflow Summit keynote:
Dynamic Task Mapping -- having the number of tasks vary at run time based on
the result of a previous step
The tl;dr of this is that it adds a form of "dynamism" to Airflow DAGs that
wasn't previously possible, and opens up a lot of exciting new workfloas for
DAGs.
A short example to whet your appetites:
@task
def my_task(x,y):
return x + y
@task
def sum_values(x: Iterable[int]) -> int:
return sum(x)
add_values = [1,2,3]
added_values = my_task.partial(x=5).map(y=add_values)
result = sum_values(added_values)
Now this example is trivial and doesn't need multiple tasks, but it shows the
power of what we can do. At run time this would result in 4 task instances
being run, and the end result is 21.
A more useful example would be to use an S3PrefixSensor, then operate on each
file in a separate task:
with dag:
sensor = S3PrefixSensor(
bucket_name='test-airflow-bucket',
prefix='vendor_a/incoming/{{ data_interval_start }}/',
)
@task
def process_file(key: str):
return len(S3Hook().read_key(key).splitlines())
processed = process_file.map(key=sensor.output)
@task
def sum_values(x: Iterable[int]) -> int:
return sum(x)
result = sum_values(processed)
This API change is (hopefully) deceptively simple, but implementing it is not
straight forward -- we've tried to go into lots of detail in the AIP document:
AIP-42: Dynamic Task Mapping - Airflow - Apache Software Foundation
[https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-42%3A+Dynamic+Task+Mapping]
We look forward to discussing this further as this is a feature we’re all
really excited about :).
Happy Friday!
Ash Berlin-Taylor, TP Chung, and Daniel Imberman