Hello everyone!
I'd like to start a discussion about a new feature that we've been thinking about for a while, and that Ash mentioned in his Airflow Summit keynote: Dynamic Task Mapping -- having the number of tasks vary at run time based on the result of a previous step The tl;dr of this is that it adds a form of "dynamism" to Airflow DAGs that wasn't previously possible, and opens up a lot of exciting new workfloas for DAGs. A short example to whet your appetites: @task def my_task(x,y): return x + y @task def sum_values(x: Iterable[int]) -> int: return sum(x) add_values = [1,2,3] added_values = my_task.partial(x=5).map(y=add_values) result = sum_values(added_values) Now this example is trivial and doesn't need multiple tasks, but it shows the power of what we can do. At run time this would result in 4 task instances being run, and the end result is 21. A more useful example would be to use an S3PrefixSensor, then operate on each file in a separate task: with dag: sensor = S3PrefixSensor( bucket_name='test-airflow-bucket', prefix='vendor_a/incoming/{{ data_interval_start }}/', ) @task def process_file(key: str): return len(S3Hook().read_key(key).splitlines()) processed = process_file.map(key=sensor.output) @task def sum_values(x: Iterable[int]) -> int: return sum(x) result = sum_values(processed) This API change is (hopefully) deceptively simple, but implementing it is not straight forward -- we've tried to go into lots of detail in the AIP document: AIP-42: Dynamic Task Mapping - Airflow - Apache Software Foundation [https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-42%3A+Dynamic+Task+Mapping] We look forward to discussing this further as this is a feature we’re all really excited about :). Happy Friday! Ash Berlin-Taylor, TP Chung, and Daniel Imberman