uranusjr opened a new pull request #21641:
URL: https://github.com/apache/airflow/pull/21641


   This is another PR that is likely much too big. Many (sort of interrelated) 
things done in one.
   
   #### Rewrite `@task` mapped operators expansion so it behave correctly
   
   Previously, a mapped `@task` was not expanded correctly because its mapped 
argument would look something like this:
   
   ```python
   mapped_arguments={
       "op_kwargs": {
           "arg": [1, 2, 3],
       },
   }
   ```
   
   this cannot be expanded using normal logic, because `PythonOperator` 
natively expects something like this instead:
   
   ```python
   mapped_arguments={
       "op_kwargs": [
           {"arg": 1},
           {"arg": 2},
           {"arg": 3},
       ],
   }
   ```
   
   so additional logic is implemented to modify `expand_mapped_task` to 
correctly expand.
   
   #### Re-implement `expand_mapped_task` logic to be able to expand literals
   
   Previously, `expand_mapped_task` only looks in TaskMap and cannot expand 
e.g. `.map(arg=[1, 2, 3])`. This implements a more sophisticated logic to 
collect information from mapped arguments and expand literal and XComArg inputs 
properly. Some refactoring was also done so the same logic can be reused for 
task-runtime value unpacking.
   
   #### Additional checks added to ensure correct literal types are passed to 
`.map()`
   
   Basically making sure it only receives lists and dicts (or XComArg, which is 
already checked separately when the upstream pushes to XCom).
   
   #### Extend `TaskInstance.render_templates` to “unpack” mapped values for 
task execution
   
   This is the main thing. After all template fields are rendered, additional 
process is done to “unpack” mapped values into individual ones based on 
map_index, which further mutates the operator object held by a TaskInstance. 
This reuses similar logic from `expand_mapped_task` to calculate the total 
length of map, so it can locate its map_index inside the series.
   
   Previously, `render_template_fields` was only implemented on BaseOperator, 
and a MappedOperator is first unmapped before templates are rendered. This 
approach was unforuantely wrong, since value unpacking (which needs to happen 
after template rendering resolves XComArg) needs to be aware of the original 
MappedOperator (mainly to access the user-supplied mapped kwargs). So the new 
logic delays unmapping until _during_ `render_templates`. If a TaskInstance’s 
`task` is a MappedOperator, its `render_templates` would call the 
MappedOperator’s `render_template_fields`, which unmaps itself, calls 
`render_template_fields` on the unmapped operator, unpacks values for the 
unmapped operator, _and then returns the unmapped operator_. The TaskInstance 
would reassign its `task` to the unmapped, rendered, unpacked operator. This 
means that `TaskInstance.render_templates` now may have a side effect of 
mutating its own `task` attribute. I don’t particularly like this, but couldn’t 
find a c
 leaner approach without breaking much of the existing interface (complicated 
by the fact that both `BaseOperator.render_template_fields` and 
`BaseOperator.render_template` are public API). I think this is close to the 
best possible interface considering existing constraints (and I try to document 
this as clearly as possible for future maintainability).
   
   A few tests were added to check this is working (see 
`TestMappedTaskInstanceReceiveValue`).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to