Exciting to see this! This opens a whole lot of new possibilities! Will take a deeper look and get back to you for questions/feedback.
Cheers! On Fri, Nov 5, 2021 at 12:10 PM Xiaodong Deng <xdd...@apache.org> wrote: > Looks nice👍 I do think this will allows much higher flexibility for the > users. > > Will have a look in details as well, and get back to you if I have > any questions/feedbacks. > > Thanks! > > > XD > > On Fri, Nov 5, 2021 at 19:52 Jarek Potiuk <ja...@potiuk.com> wrote: > >> Exciting and yeah - I've heard MANY requests from our users for this >> kind of feature! >> >> Looks good - I will take a close look over the weekend for sure :) >> >> On Fri, Nov 5, 2021 at 6:29 PM Kaxil Naik <kaxiln...@gmail.com> wrote: >> > >> > Well done folks, this is exciting and will solve lot of the current >> issues with "Dynamic DAGs" (especially ones that need fan-out) >> > >> > On Fri, Nov 5, 2021 at 5:02 PM Daniel Imberman < >> daniel.imber...@gmail.com> wrote: >> >> >> >> >> >> Hello everyone! >> >> >> >> >> >> I'd like to start a discussion about a new feature that we've been >> thinking about for a while, and that Ash mentioned in his Airflow Summit >> keynote: >> >> >> >> >> >> Dynamic Task Mapping -- having the number of tasks vary at run time >> based on the result of a previous step >> >> >> >> >> >> The tl;dr of this is that it adds a form of "dynamism" to Airflow DAGs >> that wasn't previously possible, and opens up a lot of exciting new >> workfloas for DAGs. >> >> >> >> >> >> A short example to whet your appetites: >> >> >> >> >> >> @task >> >> >> >> def my_task(x,y): >> >> >> >> return x + y >> >> >> >> >> >> @task >> >> >> >> def sum_values(x: Iterable[int]) -> int: >> >> >> >> return sum(x) >> >> >> >> >> >> add_values = [1,2,3] >> >> >> >> >> >> added_values = my_task.partial(x=5).map(y=add_values) >> >> >> >> result = sum_values(added_values) >> >> >> >> >> >> Now this example is trivial and doesn't need multiple tasks, but it >> shows the power of what we can do. At run time this would result in 4 task >> instances being run, and the end result is 21. >> >> >> >> >> >> A more useful example would be to use an S3PrefixSensor, then operate >> on each file in a separate task: >> >> >> >> >> >> with dag: >> >> >> >> sensor = S3PrefixSensor( >> >> >> >> bucket_name='test-airflow-bucket', >> >> >> >> prefix='vendor_a/incoming/{{ data_interval_start }}/', >> >> >> >> ) >> >> >> >> >> >> @task >> >> >> >> def process_file(key: str): >> >> >> >> return len(S3Hook().read_key(key).splitlines()) >> >> >> >> >> >> processed = process_file.map(key=sensor.output) >> >> >> >> >> >> @task >> >> >> >> def sum_values(x: Iterable[int]) -> int: >> >> >> >> return sum(x) >> >> >> >> >> >> result = sum_values(processed) >> >> >> >> >> >> This API change is (hopefully) deceptively simple, but implementing it >> is not straight forward -- we've tried to go into lots of detail in the AIP >> document: >> >> >> >> >> >> AIP-42: Dynamic Task Mapping - Airflow - Apache Software Foundation >> >> >> >> >> >> We look forward to discussing this further as this is a feature we’re >> all really excited about :). >> >> >> >> >> >> Happy Friday! >> >> >> >> >> >> Ash Berlin-Taylor, TP Chung, and Daniel Imberman >> >