Looks nice👍 I do think this will allows much higher flexibility for the
users.

Will have a look in details as well, and get back to you if I have
any questions/feedbacks.

Thanks!


XD

On Fri, Nov 5, 2021 at 19:52 Jarek Potiuk <[email protected]> wrote:

> Exciting and yeah - I've heard MANY requests from our users for this
> kind of feature!
>
> Looks good - I will take a close look over the weekend for sure :)
>
> On Fri, Nov 5, 2021 at 6:29 PM Kaxil Naik <[email protected]> wrote:
> >
> > Well done folks, this is exciting and will solve lot of the current
> issues with "Dynamic DAGs" (especially ones that need fan-out)
> >
> > On Fri, Nov 5, 2021 at 5:02 PM Daniel Imberman <
> [email protected]> wrote:
> >>
> >>
> >> Hello everyone!
> >>
> >>
> >> I'd like to start a discussion about a new feature that we've been
> thinking about for a while, and that Ash mentioned in his Airflow Summit
> keynote:
> >>
> >>
> >> Dynamic Task Mapping -- having the number of tasks vary at run time
> based on the result of a previous step
> >>
> >>
> >> The tl;dr of this is that it adds a form of "dynamism" to Airflow DAGs
> that wasn't previously possible, and opens up a lot of exciting new
> workfloas for DAGs.
> >>
> >>
> >> A short example to whet your appetites:
> >>
> >>
> >> @task
> >>
> >> def my_task(x,y):
> >>
> >>     return x + y
> >>
> >>
> >> @task
> >>
> >> def sum_values(x: Iterable[int]) -> int:
> >>
> >>     return sum(x)
> >>
> >>
> >> add_values = [1,2,3]
> >>
> >>
> >> added_values = my_task.partial(x=5).map(y=add_values)
> >>
> >> result = sum_values(added_values)
> >>
> >>
> >> Now  this example is trivial and doesn't need multiple tasks, but it
> shows the power of what we can do. At run time this would result in 4 task
> instances being run, and the end result is 21.
> >>
> >>
> >> A more useful example would be to use an S3PrefixSensor, then operate
> on each file in a separate task:
> >>
> >>
> >> with dag:
> >>
> >>     sensor = S3PrefixSensor(
> >>
> >>         bucket_name='test-airflow-bucket',
> >>
> >>         prefix='vendor_a/incoming/{{ data_interval_start }}/',
> >>
> >>     )
> >>
> >>
> >>     @task
> >>
> >>     def process_file(key: str):
> >>
> >>         return len(S3Hook().read_key(key).splitlines())
> >>
> >>
> >>     processed = process_file.map(key=sensor.output)
> >>
> >>
> >>     @task
> >>
> >>     def sum_values(x: Iterable[int]) -> int:
> >>
> >>         return sum(x)
> >>
> >>
> >>     result = sum_values(processed)
> >>
> >>
> >> This API change is (hopefully) deceptively simple, but implementing it
> is not straight forward -- we've tried to go into lots of detail in the AIP
> document:
> >>
> >>
> >> AIP-42: Dynamic Task Mapping - Airflow - Apache Software Foundation
> >>
> >>
> >> We look forward to discussing this further as this is a feature we’re
> all really excited about :).
> >>
> >>
> >> Happy Friday!
> >>
> >>
> >> Ash Berlin-Taylor, TP Chung, and Daniel Imberman
>

Reply via email to