kaxil opened a new pull request, #67875:
URL: https://github.com/apache/airflow/pull/67875

   Follow-up to #67644 (typed structured output in the ``common-ai`` provider). 
That change registered an operator's ``output_type`` Pydantic class in the XCom 
deserialization allow-list as a side effect of operator ``__init__``. This 
moves registration to the worker, which walks the loaded DAG before running a 
task and registers every declared class. It fixes two cases the ``__init__`` 
approach could not reach and removes the construction-time side effect.
   
   ## Bugs fixed
   
   **1. Mapped producer to typed consumer.** 
``@task.llm(output_type=X).expand(...)`` registers ``X`` only when the mapped 
task unmaps and runs (which goes through ``__init__``), in the producer's own 
worker. The consumer task runs in a different worker that loads the DAG with 
the producer still an unexpanded ``MappedOperator`` (no ``__init__``), so ``X`` 
is never registered there and deserialization raises ``ImportError``. The 
shipped ``example_llm_analysis_pipeline`` is exactly this shape (mapped 
``analyze_ticket`` to ``store_results(list[TicketAnalysis])``).
   
   **2. Workers that reconstruct operators without ``__init__``.** A parsed-DAG 
cache that ``cloudpickle``-loads operators restores ``output_type`` as object 
state but never re-runs ``__init__``, so the registration never happens and the 
same ``ImportError`` occurs on a cache hit.
   
   ## Design
   
   - ``BaseOperator`` gains ``deserialization_allowed_class_fields: 
ClassVar[tuple[str, ...]] = ()``. The ``common-ai`` operators set 
``("output_type",)``.
   - ``task_runner.parse()`` walks every task in the loaded DAG and registers 
each reachable Pydantic model before any task runs. It reads the declared 
fields off real operators (``getattr``) and off not-yet-expanded mapped 
operators (``partial_kwargs``), so it reaches mapped producers and works 
regardless of how the DAG was loaded.
   - The generic Pydantic type-tree walk (a class, ``Union``/``Optional``, 
``list[...]``, etc.) lives in ``serde`` as ``iter_pydantic_models``.
   - A ``serde.SUPPORTS_OPERATOR_DESERIALIZATION_WALKER`` flag lets the 
provider detect cores that have the walk. On those, the operator returns the 
model instance; on older cores it dumps to a ``dict`` so the value is still 
deserializable without an allow-list edit.
   
   The operator ``__init__`` registration side effect is removed entirely.
   
   ## Security
   
   The allow-list gate (``serde.deserialize``) is unchanged. Only a class an 
operator declares, read from the trusted parsed DAG, is registered, and 
registration stays exact-match (a tampered XCom naming an undeclared or 
prefix-colliding class is still rejected). The class source is the same trust 
tier as the DAG file, which already runs as code.
   
   ## Behavior notes
   
   - A class that cannot be re-imported by qualified name (defined in a 
function or another class, dynamically built, or a parameterized generic) is 
skipped with a warning at worker startup and fails to deserialize at the 
consumer. The previous code raised at construction time instead; the class must 
be defined at module scope.
   - Cross-DAG ``xcom_pull`` is unchanged: the consumer worker only loads its 
own DAG, so cross-DAG consumers still need the class in ``[core] 
allowed_deserialization_classes``.
   
   ## Verification
   
   Confirmed end-to-end in a standalone deployment with a mapped producer (an 
operator returning a module-scope ``BaseModel`` without calling an LLM) feeding 
a downstream consumer that runs in a separate worker.
   
   Before this change, the consumer task raises at deserialization:
   
   ```
   ImportError: <module>.Ticket was not found in allow list for deserialization 
imports.
   ```
   
   After, the consumer deserializes the mapped producer's XComs into real model 
instances and uses attribute access:
   
   ```
   store got Ticket: priority='high' summary="ticket for 'a'"
   store got Ticket: priority='high' summary="ticket for 'b'"
   store got Ticket: priority='high' summary="ticket for 'c'"
   ```
   
   The run with the mapped producer (`FakeLLM`, map indices 0-2) and the typed 
consumer (`store`) all succeed:
   
   <img width="1680" height="1000" alt="fix_grid" 
src="https://github.com/user-attachments/assets/2268d685-44a6-4990-b28c-320beb9278a1";
 />
   
   Unit tests cover ``iter_pydantic_models`` shapes and the worker walk for 
real and mapped operators, including that a bad (non-importable) declaration is 
logged and skipped rather than failing task startup. The walk test fails if the 
mapped branch is reverted.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to