kaxil opened a new pull request, #67875:
URL: https://github.com/apache/airflow/pull/67875
Follow-up to #67644 (typed structured output in the ``common-ai`` provider).
That change registered an operator's ``output_type`` Pydantic class in the XCom
deserialization allow-list as a side effect of operator ``__init__``. This
moves registration to the worker, which walks the loaded DAG before running a
task and registers every declared class. It fixes two cases the ``__init__``
approach could not reach and removes the construction-time side effect.
## Bugs fixed
**1. Mapped producer to typed consumer.**
``@task.llm(output_type=X).expand(...)`` registers ``X`` only when the mapped
task unmaps and runs (which goes through ``__init__``), in the producer's own
worker. The consumer task runs in a different worker that loads the DAG with
the producer still an unexpanded ``MappedOperator`` (no ``__init__``), so ``X``
is never registered there and deserialization raises ``ImportError``. The
shipped ``example_llm_analysis_pipeline`` is exactly this shape (mapped
``analyze_ticket`` to ``store_results(list[TicketAnalysis])``).
**2. Workers that reconstruct operators without ``__init__``.** A parsed-DAG
cache that ``cloudpickle``-loads operators restores ``output_type`` as object
state but never re-runs ``__init__``, so the registration never happens and the
same ``ImportError`` occurs on a cache hit.
## Design
- ``BaseOperator`` gains ``deserialization_allowed_class_fields:
ClassVar[tuple[str, ...]] = ()``. The ``common-ai`` operators set
``("output_type",)``.
- ``task_runner.parse()`` walks every task in the loaded DAG and registers
each reachable Pydantic model before any task runs. It reads the declared
fields off real operators (``getattr``) and off not-yet-expanded mapped
operators (``partial_kwargs``), so it reaches mapped producers and works
regardless of how the DAG was loaded.
- The generic Pydantic type-tree walk (a class, ``Union``/``Optional``,
``list[...]``, etc.) lives in ``serde`` as ``iter_pydantic_models``.
- A ``serde.SUPPORTS_OPERATOR_DESERIALIZATION_WALKER`` flag lets the
provider detect cores that have the walk. On those, the operator returns the
model instance; on older cores it dumps to a ``dict`` so the value is still
deserializable without an allow-list edit.
The operator ``__init__`` registration side effect is removed entirely.
## Security
The allow-list gate (``serde.deserialize``) is unchanged. Only a class an
operator declares, read from the trusted parsed DAG, is registered, and
registration stays exact-match (a tampered XCom naming an undeclared or
prefix-colliding class is still rejected). The class source is the same trust
tier as the DAG file, which already runs as code.
## Behavior notes
- A class that cannot be re-imported by qualified name (defined in a
function or another class, dynamically built, or a parameterized generic) is
skipped with a warning at worker startup and fails to deserialize at the
consumer. The previous code raised at construction time instead; the class must
be defined at module scope.
- Cross-DAG ``xcom_pull`` is unchanged: the consumer worker only loads its
own DAG, so cross-DAG consumers still need the class in ``[core]
allowed_deserialization_classes``.
## Verification
Confirmed end-to-end in a standalone deployment with a mapped producer (an
operator returning a module-scope ``BaseModel`` without calling an LLM) feeding
a downstream consumer that runs in a separate worker.
Before this change, the consumer task raises at deserialization:
```
ImportError: <module>.Ticket was not found in allow list for deserialization
imports.
```
After, the consumer deserializes the mapped producer's XComs into real model
instances and uses attribute access:
```
store got Ticket: priority='high' summary="ticket for 'a'"
store got Ticket: priority='high' summary="ticket for 'b'"
store got Ticket: priority='high' summary="ticket for 'c'"
```
The run with the mapped producer (`FakeLLM`, map indices 0-2) and the typed
consumer (`store`) all succeed:
<img width="1680" height="1000" alt="fix_grid"
src="https://github.com/user-attachments/assets/2268d685-44a6-4990-b28c-320beb9278a1"
/>
Unit tests cover ``iter_pydantic_models`` shapes and the worker walk for
real and mapped operators, including that a bad (non-importable) declaration is
logged and skipped rather than failing task startup. The walk test fails if the
mapped branch is reverted.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]