I took an initial deeper look and I have a few comments/questions:

Comments:

1) The scope looks great. I love that we only focus on one case
(map/reduce) first. There is always a possibility of implementing more
sophisticated and dynamic task creation, but this one looks like with
20% of effort we can solve 80% of cases.
2) Choosing map() is a very good idea. And follows the interface of
itertools https://book.pythontips.com/en/latest/map_filter.html#map
which is much better than for/append presented as alternative
3) Similarly using partial following the functool one will make it
familiar to all Python developers who already know it
https://docs.python.org/3/library/functools.html#functools.partial
4) Using Task groups for depth-first execution sounds like a very good
idea. I think keeping Task Group as "UI concept only" is very limiting
and it's about time we add it to be "first-class-citizen" for the
scheduler and especially executors as well. This will (this is a
separate change but will be possible) to have nicer "success/failure
callbacks, SLAs on task groups as well for example (those know require
some hacks).
5) I like that you added "small" but necessary UI decisions -
paginating tasks sound like a good idea. I think some details of the
UI/navigation between multiple tasks should be well thought out (I
have some questions/proposals to include below):

Questions/proposals:

1) Not sure as this is a bit unspecified  - do we also allow
generators as mappable values ? The example show List, and the docs
mentions also Dictionaries, and especially the "maximum_map_size"
assumes that we should know the size of the map (but the question is -
should we know it upfront and fail fast or should we allow also
generator and fail when we reach the maximum size). There is an
exclusion about "infinite" generators (obviously). But how about the
"finite" ones? That is not entirely clear (and might come from an
implementation detail when it comes to scheduling). But I wonder if
you thought about this and have an answer to that? Allowing generators
would make a bit less boilerplate, but also it might not be needed at
all. Such a generator could be used to create a List and use the list
afterwards (it has memory implications, but maybe the way it will be
implemented will not benefit from generator's potential lower memory
use). This is different than the "lazy lists" for "reduce" part as the
"mapped values" will likely be way smaller than xcom results, so maybe
it's not needed at all.

2) For the UI part - I think we should consider "filtering" from day
one and first-class citizens. Filtering for "failed" tasks seems like
a super-useful feature for operations people.

3) I am afraid that adding mapping_id to a primary key will not work
due to MySQL limitations (and I think you kind of expected there might
be problems with it). There is a problem with the primary key/index
for MySQL (our Elephant in the Room) . MySQL has a very limited
maximum size of the index entry (we've already hit it with utf8mb4
encoding). If we try to add JSONB mapping_id to primary key it will
virtually guarantee the size will be exceeded, also I believe JSONB
field cannot be added to any index (including primary key) on MySQL.
You can add a generated column out of the JSONB column and add it to
the index, but I believe in our case we cannot assume the structure of
the JSONB field and have the right generated column out of it.
https://stackoverflow.com/questions/38389075/how-to-create-index-on-json-column-in-mysq.
In the document, an alternative task id override was proposed in such
a case, but I agree it's messy, and I think a better approach will be
to generate a unique hash of the JSON data as part of primary key
instead, or even add a "mapping_id" which is simple sequentially
increasing index in the map array (this one sounds like the best and
simplest idea I think). But I think it should not be MySQL specific -
it should also be implemented for Postgres in the same way even if
Postgres has no problem with indexing JSONB fields.

J.

On Sat, Nov 6, 2021 at 11:45 PM Xinbin Huang <b...@apache.org> wrote:
>
> Exciting to see this! This opens a whole lot of new possibilities!
>
> Will take a deeper look and get back to you for questions/feedback.
>
> Cheers!
>
> On Fri, Nov 5, 2021 at 12:10 PM Xiaodong Deng <xdd...@apache.org> wrote:
>>
>> Looks niceđź‘Ť I do think this will allows much higher flexibility for the 
>> users.
>>
>> Will have a look in details as well, and get back to you if I have any 
>> questions/feedbacks.
>>
>> Thanks!
>>
>>
>> XD
>>
>> On Fri, Nov 5, 2021 at 19:52 Jarek Potiuk <ja...@potiuk.com> wrote:
>>>
>>> Exciting and yeah - I've heard MANY requests from our users for this
>>> kind of feature!
>>>
>>> Looks good - I will take a close look over the weekend for sure :)
>>>
>>> On Fri, Nov 5, 2021 at 6:29 PM Kaxil Naik <kaxiln...@gmail.com> wrote:
>>> >
>>> > Well done folks, this is exciting and will solve lot of the current 
>>> > issues with "Dynamic DAGs" (especially ones that need fan-out)
>>> >
>>> > On Fri, Nov 5, 2021 at 5:02 PM Daniel Imberman 
>>> > <daniel.imber...@gmail.com> wrote:
>>> >>
>>> >>
>>> >> Hello everyone!
>>> >>
>>> >>
>>> >> I'd like to start a discussion about a new feature that we've been 
>>> >> thinking about for a while, and that Ash mentioned in his Airflow Summit 
>>> >> keynote:
>>> >>
>>> >>
>>> >> Dynamic Task Mapping -- having the number of tasks vary at run time 
>>> >> based on the result of a previous step
>>> >>
>>> >>
>>> >> The tl;dr of this is that it adds a form of "dynamism" to Airflow DAGs 
>>> >> that wasn't previously possible, and opens up a lot of exciting new 
>>> >> workfloas for DAGs.
>>> >>
>>> >>
>>> >> A short example to whet your appetites:
>>> >>
>>> >>
>>> >> @task
>>> >>
>>> >> def my_task(x,y):
>>> >>
>>> >>     return x + y
>>> >>
>>> >>
>>> >> @task
>>> >>
>>> >> def sum_values(x: Iterable[int]) -> int:
>>> >>
>>> >>     return sum(x)
>>> >>
>>> >>
>>> >> add_values = [1,2,3]
>>> >>
>>> >>
>>> >> added_values = my_task.partial(x=5).map(y=add_values)
>>> >>
>>> >> result = sum_values(added_values)
>>> >>
>>> >>
>>> >> Now  this example is trivial and doesn't need multiple tasks, but it 
>>> >> shows the power of what we can do. At run time this would result in 4 
>>> >> task instances being run, and the end result is 21.
>>> >>
>>> >>
>>> >> A more useful example would be to use an S3PrefixSensor, then operate on 
>>> >> each file in a separate task:
>>> >>
>>> >>
>>> >> with dag:
>>> >>
>>> >>     sensor = S3PrefixSensor(
>>> >>
>>> >>         bucket_name='test-airflow-bucket',
>>> >>
>>> >>         prefix='vendor_a/incoming/{{ data_interval_start }}/',
>>> >>
>>> >>     )
>>> >>
>>> >>
>>> >>     @task
>>> >>
>>> >>     def process_file(key: str):
>>> >>
>>> >>         return len(S3Hook().read_key(key).splitlines())
>>> >>
>>> >>
>>> >>     processed = process_file.map(key=sensor.output)
>>> >>
>>> >>
>>> >>     @task
>>> >>
>>> >>     def sum_values(x: Iterable[int]) -> int:
>>> >>
>>> >>         return sum(x)
>>> >>
>>> >>
>>> >>     result = sum_values(processed)
>>> >>
>>> >>
>>> >> This API change is (hopefully) deceptively simple, but implementing it 
>>> >> is not straight forward -- we've tried to go into lots of detail in the 
>>> >> AIP document:
>>> >>
>>> >>
>>> >> AIP-42: Dynamic Task Mapping - Airflow - Apache Software Foundation
>>> >>
>>> >>
>>> >> We look forward to discussing this further as this is a feature we’re 
>>> >> all really excited about :).
>>> >>
>>> >>
>>> >> Happy Friday!
>>> >>
>>> >>
>>> >> Ash Berlin-Taylor, TP Chung, and Daniel Imberman

Reply via email to