I took an initial deeper look and I have a few comments/questions: Comments:
1) The scope looks great. I love that we only focus on one case (map/reduce) first. There is always a possibility of implementing more sophisticated and dynamic task creation, but this one looks like with 20% of effort we can solve 80% of cases. 2) Choosing map() is a very good idea. And follows the interface of itertools https://book.pythontips.com/en/latest/map_filter.html#map which is much better than for/append presented as alternative 3) Similarly using partial following the functool one will make it familiar to all Python developers who already know it https://docs.python.org/3/library/functools.html#functools.partial 4) Using Task groups for depth-first execution sounds like a very good idea. I think keeping Task Group as "UI concept only" is very limiting and it's about time we add it to be "first-class-citizen" for the scheduler and especially executors as well. This will (this is a separate change but will be possible) to have nicer "success/failure callbacks, SLAs on task groups as well for example (those know require some hacks). 5) I like that you added "small" but necessary UI decisions - paginating tasks sound like a good idea. I think some details of the UI/navigation between multiple tasks should be well thought out (I have some questions/proposals to include below): Questions/proposals: 1) Not sure as this is a bit unspecified - do we also allow generators as mappable values ? The example show List, and the docs mentions also Dictionaries, and especially the "maximum_map_size" assumes that we should know the size of the map (but the question is - should we know it upfront and fail fast or should we allow also generator and fail when we reach the maximum size). There is an exclusion about "infinite" generators (obviously). But how about the "finite" ones? That is not entirely clear (and might come from an implementation detail when it comes to scheduling). But I wonder if you thought about this and have an answer to that? Allowing generators would make a bit less boilerplate, but also it might not be needed at all. Such a generator could be used to create a List and use the list afterwards (it has memory implications, but maybe the way it will be implemented will not benefit from generator's potential lower memory use). This is different than the "lazy lists" for "reduce" part as the "mapped values" will likely be way smaller than xcom results, so maybe it's not needed at all. 2) For the UI part - I think we should consider "filtering" from day one and first-class citizens. Filtering for "failed" tasks seems like a super-useful feature for operations people. 3) I am afraid that adding mapping_id to a primary key will not work due to MySQL limitations (and I think you kind of expected there might be problems with it). There is a problem with the primary key/index for MySQL (our Elephant in the Room) . MySQL has a very limited maximum size of the index entry (we've already hit it with utf8mb4 encoding). If we try to add JSONB mapping_id to primary key it will virtually guarantee the size will be exceeded, also I believe JSONB field cannot be added to any index (including primary key) on MySQL. You can add a generated column out of the JSONB column and add it to the index, but I believe in our case we cannot assume the structure of the JSONB field and have the right generated column out of it. https://stackoverflow.com/questions/38389075/how-to-create-index-on-json-column-in-mysq. In the document, an alternative task id override was proposed in such a case, but I agree it's messy, and I think a better approach will be to generate a unique hash of the JSON data as part of primary key instead, or even add a "mapping_id" which is simple sequentially increasing index in the map array (this one sounds like the best and simplest idea I think). But I think it should not be MySQL specific - it should also be implemented for Postgres in the same way even if Postgres has no problem with indexing JSONB fields. J. On Sat, Nov 6, 2021 at 11:45 PM Xinbin Huang <b...@apache.org> wrote: > > Exciting to see this! This opens a whole lot of new possibilities! > > Will take a deeper look and get back to you for questions/feedback. > > Cheers! > > On Fri, Nov 5, 2021 at 12:10 PM Xiaodong Deng <xdd...@apache.org> wrote: >> >> Looks nice👍 I do think this will allows much higher flexibility for the >> users. >> >> Will have a look in details as well, and get back to you if I have any >> questions/feedbacks. >> >> Thanks! >> >> >> XD >> >> On Fri, Nov 5, 2021 at 19:52 Jarek Potiuk <ja...@potiuk.com> wrote: >>> >>> Exciting and yeah - I've heard MANY requests from our users for this >>> kind of feature! >>> >>> Looks good - I will take a close look over the weekend for sure :) >>> >>> On Fri, Nov 5, 2021 at 6:29 PM Kaxil Naik <kaxiln...@gmail.com> wrote: >>> > >>> > Well done folks, this is exciting and will solve lot of the current >>> > issues with "Dynamic DAGs" (especially ones that need fan-out) >>> > >>> > On Fri, Nov 5, 2021 at 5:02 PM Daniel Imberman >>> > <daniel.imber...@gmail.com> wrote: >>> >> >>> >> >>> >> Hello everyone! >>> >> >>> >> >>> >> I'd like to start a discussion about a new feature that we've been >>> >> thinking about for a while, and that Ash mentioned in his Airflow Summit >>> >> keynote: >>> >> >>> >> >>> >> Dynamic Task Mapping -- having the number of tasks vary at run time >>> >> based on the result of a previous step >>> >> >>> >> >>> >> The tl;dr of this is that it adds a form of "dynamism" to Airflow DAGs >>> >> that wasn't previously possible, and opens up a lot of exciting new >>> >> workfloas for DAGs. >>> >> >>> >> >>> >> A short example to whet your appetites: >>> >> >>> >> >>> >> @task >>> >> >>> >> def my_task(x,y): >>> >> >>> >> return x + y >>> >> >>> >> >>> >> @task >>> >> >>> >> def sum_values(x: Iterable[int]) -> int: >>> >> >>> >> return sum(x) >>> >> >>> >> >>> >> add_values = [1,2,3] >>> >> >>> >> >>> >> added_values = my_task.partial(x=5).map(y=add_values) >>> >> >>> >> result = sum_values(added_values) >>> >> >>> >> >>> >> Now this example is trivial and doesn't need multiple tasks, but it >>> >> shows the power of what we can do. At run time this would result in 4 >>> >> task instances being run, and the end result is 21. >>> >> >>> >> >>> >> A more useful example would be to use an S3PrefixSensor, then operate on >>> >> each file in a separate task: >>> >> >>> >> >>> >> with dag: >>> >> >>> >> sensor = S3PrefixSensor( >>> >> >>> >> bucket_name='test-airflow-bucket', >>> >> >>> >> prefix='vendor_a/incoming/{{ data_interval_start }}/', >>> >> >>> >> ) >>> >> >>> >> >>> >> @task >>> >> >>> >> def process_file(key: str): >>> >> >>> >> return len(S3Hook().read_key(key).splitlines()) >>> >> >>> >> >>> >> processed = process_file.map(key=sensor.output) >>> >> >>> >> >>> >> @task >>> >> >>> >> def sum_values(x: Iterable[int]) -> int: >>> >> >>> >> return sum(x) >>> >> >>> >> >>> >> result = sum_values(processed) >>> >> >>> >> >>> >> This API change is (hopefully) deceptively simple, but implementing it >>> >> is not straight forward -- we've tried to go into lots of detail in the >>> >> AIP document: >>> >> >>> >> >>> >> AIP-42: Dynamic Task Mapping - Airflow - Apache Software Foundation >>> >> >>> >> >>> >> We look forward to discussing this further as this is a feature we’re >>> >> all really excited about :). >>> >> >>> >> >>> >> Happy Friday! >>> >> >>> >> >>> >> Ash Berlin-Taylor, TP Chung, and Daniel Imberman