Hello everyone!

I'd like to start a discussion about a new feature that we've been thinking 
about for a while, and that Ash mentioned in his Airflow Summit keynote:


Dynamic Task Mapping -- having the number of tasks vary at run time based on 
the result of a previous step


The tl;dr of this is that it adds a form of "dynamism" to Airflow DAGs that 
wasn't previously possible, and opens up a lot of exciting new workfloas for 
DAGs.


A short example to whet your appetites:


@task

def my_task(x,y):

return x + y


@task

def sum_values(x: Iterable[int]) -> int:

return sum(x)


add_values = [1,2,3]


added_values = my_task.partial(x=5).map(y=add_values)

result = sum_values(added_values)


Now this example is trivial and doesn't need multiple tasks, but it shows the 
power of what we can do. At run time this would result in 4 task instances 
being run, and the end result is 21.


A more useful example would be to use an S3PrefixSensor, then operate on each 
file in a separate task:


with dag:

sensor = S3PrefixSensor(

bucket_name='test-airflow-bucket',

prefix='vendor_a/incoming/{{ data_interval_start }}/',

)


@task

def process_file(key: str):

return len(S3Hook().read_key(key).splitlines())


processed = process_file.map(key=sensor.output)


@task

def sum_values(x: Iterable[int]) -> int:

return sum(x)


result = sum_values(processed)


This API change is (hopefully) deceptively simple, but implementing it is not 
straight forward -- we've tried to go into lots of detail in the AIP document:


AIP-42: Dynamic Task Mapping - Airflow - Apache Software Foundation 
[https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-42%3A+Dynamic+Task+Mapping]




We look forward to discussing this further as this is a feature we’re all 
really excited about :).




Happy Friday!




Ash Berlin-Taylor, TP Chung, and Daniel Imberman

Reply via email to