Exciting and yeah - I've heard MANY requests from our users for this kind of feature!
Looks good - I will take a close look over the weekend for sure :) On Fri, Nov 5, 2021 at 6:29 PM Kaxil Naik <kaxiln...@gmail.com> wrote: > > Well done folks, this is exciting and will solve lot of the current issues > with "Dynamic DAGs" (especially ones that need fan-out) > > On Fri, Nov 5, 2021 at 5:02 PM Daniel Imberman <daniel.imber...@gmail.com> > wrote: >> >> >> Hello everyone! >> >> >> I'd like to start a discussion about a new feature that we've been thinking >> about for a while, and that Ash mentioned in his Airflow Summit keynote: >> >> >> Dynamic Task Mapping -- having the number of tasks vary at run time based on >> the result of a previous step >> >> >> The tl;dr of this is that it adds a form of "dynamism" to Airflow DAGs that >> wasn't previously possible, and opens up a lot of exciting new workfloas for >> DAGs. >> >> >> A short example to whet your appetites: >> >> >> @task >> >> def my_task(x,y): >> >> return x + y >> >> >> @task >> >> def sum_values(x: Iterable[int]) -> int: >> >> return sum(x) >> >> >> add_values = [1,2,3] >> >> >> added_values = my_task.partial(x=5).map(y=add_values) >> >> result = sum_values(added_values) >> >> >> Now this example is trivial and doesn't need multiple tasks, but it shows >> the power of what we can do. At run time this would result in 4 task >> instances being run, and the end result is 21. >> >> >> A more useful example would be to use an S3PrefixSensor, then operate on >> each file in a separate task: >> >> >> with dag: >> >> sensor = S3PrefixSensor( >> >> bucket_name='test-airflow-bucket', >> >> prefix='vendor_a/incoming/{{ data_interval_start }}/', >> >> ) >> >> >> @task >> >> def process_file(key: str): >> >> return len(S3Hook().read_key(key).splitlines()) >> >> >> processed = process_file.map(key=sensor.output) >> >> >> @task >> >> def sum_values(x: Iterable[int]) -> int: >> >> return sum(x) >> >> >> result = sum_values(processed) >> >> >> This API change is (hopefully) deceptively simple, but implementing it is >> not straight forward -- we've tried to go into lots of detail in the AIP >> document: >> >> >> AIP-42: Dynamic Task Mapping - Airflow - Apache Software Foundation >> >> >> We look forward to discussing this further as this is a feature we’re all >> really excited about :). >> >> >> Happy Friday! >> >> >> Ash Berlin-Taylor, TP Chung, and Daniel Imberman