Exciting and yeah - I've heard MANY requests from our users for this
kind of feature!

Looks good - I will take a close look over the weekend for sure :)

On Fri, Nov 5, 2021 at 6:29 PM Kaxil Naik <kaxiln...@gmail.com> wrote:
>
> Well done folks, this is exciting and will solve lot of the current issues 
> with "Dynamic DAGs" (especially ones that need fan-out)
>
> On Fri, Nov 5, 2021 at 5:02 PM Daniel Imberman <daniel.imber...@gmail.com> 
> wrote:
>>
>>
>> Hello everyone!
>>
>>
>> I'd like to start a discussion about a new feature that we've been thinking 
>> about for a while, and that Ash mentioned in his Airflow Summit keynote:
>>
>>
>> Dynamic Task Mapping -- having the number of tasks vary at run time based on 
>> the result of a previous step
>>
>>
>> The tl;dr of this is that it adds a form of "dynamism" to Airflow DAGs that 
>> wasn't previously possible, and opens up a lot of exciting new workfloas for 
>> DAGs.
>>
>>
>> A short example to whet your appetites:
>>
>>
>> @task
>>
>> def my_task(x,y):
>>
>>     return x + y
>>
>>
>> @task
>>
>> def sum_values(x: Iterable[int]) -> int:
>>
>>     return sum(x)
>>
>>
>> add_values = [1,2,3]
>>
>>
>> added_values = my_task.partial(x=5).map(y=add_values)
>>
>> result = sum_values(added_values)
>>
>>
>> Now  this example is trivial and doesn't need multiple tasks, but it shows 
>> the power of what we can do. At run time this would result in 4 task 
>> instances being run, and the end result is 21.
>>
>>
>> A more useful example would be to use an S3PrefixSensor, then operate on 
>> each file in a separate task:
>>
>>
>> with dag:
>>
>>     sensor = S3PrefixSensor(
>>
>>         bucket_name='test-airflow-bucket',
>>
>>         prefix='vendor_a/incoming/{{ data_interval_start }}/',
>>
>>     )
>>
>>
>>     @task
>>
>>     def process_file(key: str):
>>
>>         return len(S3Hook().read_key(key).splitlines())
>>
>>
>>     processed = process_file.map(key=sensor.output)
>>
>>
>>     @task
>>
>>     def sum_values(x: Iterable[int]) -> int:
>>
>>         return sum(x)
>>
>>
>>     result = sum_values(processed)
>>
>>
>> This API change is (hopefully) deceptively simple, but implementing it is 
>> not straight forward -- we've tried to go into lots of detail in the AIP 
>> document:
>>
>>
>> AIP-42: Dynamic Task Mapping - Airflow - Apache Software Foundation
>>
>>
>> We look forward to discussing this further as this is a feature we’re all 
>> really excited about :).
>>
>>
>> Happy Friday!
>>
>>
>> Ash Berlin-Taylor, TP Chung, and Daniel Imberman

Reply via email to