Exciting to see this! This opens a whole lot of new possibilities!

Will take a deeper look and get back to you for questions/feedback.

Cheers!

On Fri, Nov 5, 2021 at 12:10 PM Xiaodong Deng <xdd...@apache.org> wrote:

> Looks nice👍 I do think this will allows much higher flexibility for the
> users.
>
> Will have a look in details as well, and get back to you if I have
> any questions/feedbacks.
>
> Thanks!
>
>
> XD
>
> On Fri, Nov 5, 2021 at 19:52 Jarek Potiuk <ja...@potiuk.com> wrote:
>
>> Exciting and yeah - I've heard MANY requests from our users for this
>> kind of feature!
>>
>> Looks good - I will take a close look over the weekend for sure :)
>>
>> On Fri, Nov 5, 2021 at 6:29 PM Kaxil Naik <kaxiln...@gmail.com> wrote:
>> >
>> > Well done folks, this is exciting and will solve lot of the current
>> issues with "Dynamic DAGs" (especially ones that need fan-out)
>> >
>> > On Fri, Nov 5, 2021 at 5:02 PM Daniel Imberman <
>> daniel.imber...@gmail.com> wrote:
>> >>
>> >>
>> >> Hello everyone!
>> >>
>> >>
>> >> I'd like to start a discussion about a new feature that we've been
>> thinking about for a while, and that Ash mentioned in his Airflow Summit
>> keynote:
>> >>
>> >>
>> >> Dynamic Task Mapping -- having the number of tasks vary at run time
>> based on the result of a previous step
>> >>
>> >>
>> >> The tl;dr of this is that it adds a form of "dynamism" to Airflow DAGs
>> that wasn't previously possible, and opens up a lot of exciting new
>> workfloas for DAGs.
>> >>
>> >>
>> >> A short example to whet your appetites:
>> >>
>> >>
>> >> @task
>> >>
>> >> def my_task(x,y):
>> >>
>> >>     return x + y
>> >>
>> >>
>> >> @task
>> >>
>> >> def sum_values(x: Iterable[int]) -> int:
>> >>
>> >>     return sum(x)
>> >>
>> >>
>> >> add_values = [1,2,3]
>> >>
>> >>
>> >> added_values = my_task.partial(x=5).map(y=add_values)
>> >>
>> >> result = sum_values(added_values)
>> >>
>> >>
>> >> Now  this example is trivial and doesn't need multiple tasks, but it
>> shows the power of what we can do. At run time this would result in 4 task
>> instances being run, and the end result is 21.
>> >>
>> >>
>> >> A more useful example would be to use an S3PrefixSensor, then operate
>> on each file in a separate task:
>> >>
>> >>
>> >> with dag:
>> >>
>> >>     sensor = S3PrefixSensor(
>> >>
>> >>         bucket_name='test-airflow-bucket',
>> >>
>> >>         prefix='vendor_a/incoming/{{ data_interval_start }}/',
>> >>
>> >>     )
>> >>
>> >>
>> >>     @task
>> >>
>> >>     def process_file(key: str):
>> >>
>> >>         return len(S3Hook().read_key(key).splitlines())
>> >>
>> >>
>> >>     processed = process_file.map(key=sensor.output)
>> >>
>> >>
>> >>     @task
>> >>
>> >>     def sum_values(x: Iterable[int]) -> int:
>> >>
>> >>         return sum(x)
>> >>
>> >>
>> >>     result = sum_values(processed)
>> >>
>> >>
>> >> This API change is (hopefully) deceptively simple, but implementing it
>> is not straight forward -- we've tried to go into lots of detail in the AIP
>> document:
>> >>
>> >>
>> >> AIP-42: Dynamic Task Mapping - Airflow - Apache Software Foundation
>> >>
>> >>
>> >> We look forward to discussing this further as this is a feature we’re
>> all really excited about :).
>> >>
>> >>
>> >> Happy Friday!
>> >>
>> >>
>> >> Ash Berlin-Taylor, TP Chung, and Daniel Imberman
>>
>

Reply via email to