uranusjr opened a new pull request #21641:
URL: https://github.com/apache/airflow/pull/21641
This is another PR that is likely much too big. Many (sort of interrelated)
things done in one.
#### Rewrite `@task` mapped operators expansion so it behave correctly
Previously, a mapped `@task` was not expanded correctly because its mapped
argument would look something like this:
```python
mapped_arguments={
"op_kwargs": {
"arg": [1, 2, 3],
},
}
```
this cannot be expanded using normal logic, because `PythonOperator`
natively expects something like this instead:
```python
mapped_arguments={
"op_kwargs": [
{"arg": 1},
{"arg": 2},
{"arg": 3},
],
}
```
so additional logic is implemented to modify `expand_mapped_task` to
correctly expand.
#### Re-implement `expand_mapped_task` logic to be able to expand literals
Previously, `expand_mapped_task` only looks in TaskMap and cannot expand
e.g. `.map(arg=[1, 2, 3])`. This implements a more sophisticated logic to
collect information from mapped arguments and expand literal and XComArg inputs
properly. Some refactoring was also done so the same logic can be reused for
task-runtime value unpacking.
#### Additional checks added to ensure correct literal types are passed to
`.map()`
Basically making sure it only receives lists and dicts (or XComArg, which is
already checked separately when the upstream pushes to XCom).
#### Extend `TaskInstance.render_templates` to “unpack” mapped values for
task execution
This is the main thing. After all template fields are rendered, additional
process is done to “unpack” mapped values into individual ones based on
map_index, which further mutates the operator object held by a TaskInstance.
This reuses similar logic from `expand_mapped_task` to calculate the total
length of map, so it can locate its map_index inside the series.
Previously, `render_template_fields` was only implemented on BaseOperator,
and a MappedOperator is first unmapped before templates are rendered. This
approach was unforuantely wrong, since value unpacking (which needs to happen
after template rendering resolves XComArg) needs to be aware of the original
MappedOperator (mainly to access the user-supplied mapped kwargs). So the new
logic delays unmapping until _during_ `render_templates`. If a TaskInstance’s
`task` is a MappedOperator, its `render_templates` would call the
MappedOperator’s `render_template_fields`, which unmaps itself, calls
`render_template_fields` on the unmapped operator, unpacks values for the
unmapped operator, _and then returns the unmapped operator_. The TaskInstance
would reassign its `task` to the unmapped, rendered, unpacked operator. This
means that `TaskInstance.render_templates` now may have a side effect of
mutating its own `task` attribute. I don’t particularly like this, but couldn’t
find a c
leaner approach without breaking much of the existing interface (complicated
by the fact that both `BaseOperator.render_template_fields` and
`BaseOperator.render_template` are public API). I think this is close to the
best possible interface considering existing constraints (and I try to document
this as clearly as possible for future maintainability).
A few tests were added to check this is working (see
`TestMappedTaskInstanceReceiveValue`).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]