1) Not sure as this is a bit unspecified  - do we also allow
generators as mappable values ? The example show List, and the docs
mentions also Dictionaries, and especially the "maximum_map_size"
assumes that we should know the size of the map (but the question is -
should we know it upfront and fail fast or should we allow also
generator and fail when we reach the maximum size). There is an
exclusion about "infinite" generators (obviously). But how about the
"finite" ones? That is not entirely clear (and might come from an
implementation detail when it comes to scheduling). But I wonder if
you thought about this and have an answer to that? Allowing generators
would make a bit less boilerplate, but also it might not be needed at
all. Such a generator could be used to create a List and use the list
afterwards (it has memory implications, but maybe the way it will be
implemented will not benefit from generator's potential lower memory
use). This is different than the "lazy lists" for "reduce" part as the
"mapped values" will likely be way smaller than xcom results, so maybe
it's not needed at all.

Yes and no. In most realistic cases the mapping will be performed over an value from XCom, so any generator would be consumed before being serialized to XCom.

Similar for mapping over a literal -- that would be consumed and expanded to a list at DAG parse time.

So yes, you can use a generator literal, but it will be evaluated to completion at DAG parse time.

2) For the UI part - I think we should consider "filtering" from day
one and first-class citizens. Filtering for "failed" tasks seems like
a super-useful feature for operations people.

Yes, Brent and I have thought about that -- it's somewhat orthogonal to this AIP in that the AIP doesn't depend filtering in the UI, and there are cases where filtering would already be useful. But we will look at doing it as part of this project.

3) I am afraid that adding mapping_id to a primary key will not work
due to MySQL limitations (and I think you kind of expected there might
be problems with it). There is a problem with the primary key/index
for MySQL (our Elephant in the Room) . MySQL has a very limited
maximum size of the index entry (we've already hit it with utf8mb4
encoding). If we try to add JSONB mapping_id to primary key it will
virtually guarantee the size will be exceeded, also I believe JSONB
field cannot be added to any index (including primary key) on MySQL.
You can add a generated column out of the JSONB column and add it to
the index, but I believe in our case we cannot assume the structure of
the JSONB field and have the right generated column out of it.
<https://stackoverflow.com/questions/38389075/how-to-create-index-on-json-column-in-mysq>.
In the document, an alternative task id override was proposed in such
a case, but I agree it's messy, and I think a better approach will be
to generate a unique hash of the JSON data as part of primary key
instead, or even add a "mapping_id" which is simple sequentially
increasing index in the map array (this one sounds like the best and
simplest idea I think). But I think it should not be MySQL specific -
it should also be implemented for Postgres in the same way even if
Postgres has no problem with indexing JSONB fields.

Yeah, looking at this some more it also appears that Mysql 5.x doesn't allow defaults on JSON columns either (8 does) so we'll have to come up with a new plan.

One option we do have to reduce index length of the PK is we could use (already existing) auto-increment`id` column from the dag_run table -- so the PK on task_instance could be changed from (dag_id,run_id,task_id) to (dag_run_id,task_id). This would reduce the index size from ~2k (250×3bytes per char ×3 fields) to just over 758 (1 field plus an integer).

We'd have to look at query patterns, and decide if we should keep the table denormalized and maintain dag_id and run_id columns on the task_instance or not.

Daniel edited the AIP last night to add a new "TaskMapping" table, the original goal there was to store info about XCom values so that when the scheduler performs the expansion of tasks it doesn't need to fetch the (possibly very large) value form a custom XCom backend just to get the length of a list or keys of a dict. We'll give this some thought and see how we can use this new table to just store an "index" to the entry.

Roughly what I'm proposing is the PK on the task_instance table becomes something like (dag_run_id, task_id, mapping_index) where mapping_index is now an array index into a JSON value in a row in the new task_mapping table. And since primary key columns can't be nullable, and arrays are zero indexed, we'd have to use -1 as "not mapped" value.

Does that make any sense?

There's still a bit more info to work out, such as how we find the right row in task_mapping table, as it is associated with the _parent_ task, not the mapped task itself.

Oh, it gets a bit more complicated actually! It is possible to do ` task.map(x=list1, y=list2)` to map over two lists at once (making a cartesian product) and in this case the mapping would be unique per _consuming_ tasks.

Okay, so a bit more thought required on the details here but I think the overall plan is workable.

-ash





On Sun, Nov 7 2021 at 15:57:02 +0100, Jarek Potiuk <ja...@potiuk.com> wrote:
I took an initial deeper look and I have a few comments/questions:

Comments:

1) The scope looks great. I love that we only focus on one case
(map/reduce) first. There is always a possibility of implementing more
sophisticated and dynamic task creation, but this one looks like with
20% of effort we can solve 80% of cases.
2) Choosing map() is a very good idea. And follows the interface of
itertools <https://book.pythontips.com/en/latest/map_filter.html#map>
which is much better than for/append presented as alternative
3) Similarly using partial following the functool one will make it
familiar to all Python developers who already know it
<https://docs.python.org/3/library/functools.html#functools.partial>
4) Using Task groups for depth-first execution sounds like a very good
idea. I think keeping Task Group as "UI concept only" is very limiting
and it's about time we add it to be "first-class-citizen" for the
scheduler and especially executors as well. This will (this is a
separate change but will be possible) to have nicer "success/failure
callbacks, SLAs on task groups as well for example (those know require
some hacks).
5) I like that you added "small" but necessary UI decisions -
paginating tasks sound like a good idea. I think some details of the
UI/navigation between multiple tasks should be well thought out (I
have some questions/proposals to include below):

Questions/proposals:

1) Not sure as this is a bit unspecified  - do we also allow
generators as mappable values ? The example show List, and the docs
mentions also Dictionaries, and especially the "maximum_map_size"
assumes that we should know the size of the map (but the question is -
should we know it upfront and fail fast or should we allow also
generator and fail when we reach the maximum size). There is an
exclusion about "infinite" generators (obviously). But how about the
"finite" ones? That is not entirely clear (and might come from an
implementation detail when it comes to scheduling). But I wonder if
you thought about this and have an answer to that? Allowing generators
would make a bit less boilerplate, but also it might not be needed at
all. Such a generator could be used to create a List and use the list
afterwards (it has memory implications, but maybe the way it will be
implemented will not benefit from generator's potential lower memory
use). This is different than the "lazy lists" for "reduce" part as the
"mapped values" will likely be way smaller than xcom results, so maybe
it's not needed at all.

2) For the UI part - I think we should consider "filtering" from day
one and first-class citizens. Filtering for "failed" tasks seems like
a super-useful feature for operations people.

3) I am afraid that adding mapping_id to a primary key will not work
due to MySQL limitations (and I think you kind of expected there might
be problems with it). There is a problem with the primary key/index
for MySQL (our Elephant in the Room) . MySQL has a very limited
maximum size of the index entry (we've already hit it with utf8mb4
encoding). If we try to add JSONB mapping_id to primary key it will
virtually guarantee the size will be exceeded, also I believe JSONB
field cannot be added to any index (including primary key) on MySQL.
You can add a generated column out of the JSONB column and add it to
the index, but I believe in our case we cannot assume the structure of
the JSONB field and have the right generated column out of it.
<https://stackoverflow.com/questions/38389075/how-to-create-index-on-json-column-in-mysq>.
In the document, an alternative task id override was proposed in such
a case, but I agree it's messy, and I think a better approach will be
to generate a unique hash of the JSON data as part of primary key
instead, or even add a "mapping_id" which is simple sequentially
increasing index in the map array (this one sounds like the best and
simplest idea I think). But I think it should not be MySQL specific -
it should also be implemented for Postgres in the same way even if
Postgres has no problem with indexing JSONB fields.

J.

On Sat, Nov 6, 2021 at 11:45 PM Xinbin Huang <b...@apache.org <mailto:b...@apache.org>> wrote:

 Exciting to see this! This opens a whole lot of new possibilities!

 Will take a deeper look and get back to you for questions/feedback.

 Cheers!

On Fri, Nov 5, 2021 at 12:10 PM Xiaodong Deng <xdd...@apache.org <mailto:xdd...@apache.org>> wrote:

Looks nice👍 I do think this will allows much higher flexibility for the users.

Will have a look in details as well, and get back to you if I have any questions/feedbacks.

 Thanks!


 XD

On Fri, Nov 5, 2021 at 19:52 Jarek Potiuk <ja...@potiuk.com <mailto:ja...@potiuk.com>> wrote:

Exciting and yeah - I've heard MANY requests from our users for this
 kind of feature!

 Looks good - I will take a close look over the weekend for sure :)

On Fri, Nov 5, 2021 at 6:29 PM Kaxil Naik <kaxiln...@gmail.com <mailto:kaxiln...@gmail.com>> wrote:
 >
> Well done folks, this is exciting and will solve lot of the current issues with "Dynamic DAGs" (especially ones that need fan-out)
 >
> On Fri, Nov 5, 2021 at 5:02 PM Daniel Imberman <daniel.imber...@gmail.com <mailto:daniel.imber...@gmail.com>> wrote:
 >>
 >>
 >> Hello everyone!
 >>
 >>
>> I'd like to start a discussion about a new feature that we've been thinking about for a while, and that Ash mentioned in his Airflow Summit keynote:
 >>
 >>
>> Dynamic Task Mapping -- having the number of tasks vary at run time based on the result of a previous step
 >>
 >>
>> The tl;dr of this is that it adds a form of "dynamism" to Airflow DAGs that wasn't previously possible, and opens up a lot of exciting new workfloas for DAGs.
 >>
 >>
 >> A short example to whet your appetites:
 >>
 >>
 >> @task
 >>
 >> def my_task(x,y):
 >>
 >>     return x + y
 >>
 >>
 >> @task
 >>
 >> def sum_values(x: Iterable[int]) -> int:
 >>
 >>     return sum(x)
 >>
 >>
 >> add_values = [1,2,3]
 >>
 >>
 >> added_values = my_task.partial(x=5).map(y=add_values)
 >>
 >> result = sum_values(added_values)
 >>
 >>
>> Now this example is trivial and doesn't need multiple tasks, but it shows the power of what we can do. At run time this would result in 4 task instances being run, and the end result is 21.
 >>
 >>
>> A more useful example would be to use an S3PrefixSensor, then operate on each file in a separate task:
 >>
 >>
 >> with dag:
 >>
 >>     sensor = S3PrefixSensor(
 >>
 >>         bucket_name='test-airflow-bucket',
 >>
 >>         prefix='vendor_a/incoming/{{ data_interval_start }}/',
 >>
 >>     )
 >>
 >>
 >>     @task
 >>
 >>     def process_file(key: str):
 >>
 >>         return len(S3Hook().read_key(key).splitlines())
 >>
 >>
 >>     processed = process_file.map(key=sensor.output)
 >>
 >>
 >>     @task
 >>
 >>     def sum_values(x: Iterable[int]) -> int:
 >>
 >>         return sum(x)
 >>
 >>
 >>     result = sum_values(processed)
 >>
 >>
>> This API change is (hopefully) deceptively simple, but implementing it is not straight forward -- we've tried to go into lots of detail in the AIP document:
 >>
 >>
>> AIP-42: Dynamic Task Mapping - Airflow - Apache Software Foundation
 >>
 >>
>> We look forward to discussing this further as this is a feature we’re all really excited about :).
 >>
 >>
 >> Happy Friday!
 >>
 >>
 >> Ash Berlin-Taylor, TP Chung, and Daniel Imberman

Reply via email to