This is an automated email from the ASF dual-hosted git repository.

kaxil pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new fbd24b5720b Register nested Pydantic models for XCom deserialization 
(#67932)
fbd24b5720b is described below

commit fbd24b5720bf70c14acd286e4e914f4e5eb78888
Author: Kaxil Naik <[email protected]>
AuthorDate: Wed Jun 3 15:31:51 2026 +0100

    Register nested Pydantic models for XCom deserialization (#67932)
    
    The worker-side walk that registers an operator's structured-output
    classes (output_type) for XCom deserialization only registered the
    top-level type. iter_pydantic_models walked the annotation shape
    (Optional/Union/list) but never a model's own fields, so a model nested
    inside the declared type -- e.g. SubQuestion reachable via
    DecomposedQuestion.sub_questions -- was never allow-listed. A task that
    pushed such a nested model to XCom then failed to deserialize it with
    '... was not found in allow list for deserialization imports'.
    
    Recurse into a model's fields so every reachable model is yielded and
    registered. The existing seen set makes self-referential and mutually
    recursive model graphs terminate.
---
 task-sdk/src/airflow/sdk/serde/__init__.py  | 15 +++++++++++---
 task-sdk/tests/task_sdk/serde/test_serde.py | 31 +++++++++++++++++++++++++++++
 2 files changed, 43 insertions(+), 3 deletions(-)

diff --git a/task-sdk/src/airflow/sdk/serde/__init__.py 
b/task-sdk/src/airflow/sdk/serde/__init__.py
index 028d1d391cf..0d94ebc3282 100644
--- a/task-sdk/src/airflow/sdk/serde/__init__.py
+++ b/task-sdk/src/airflow/sdk/serde/__init__.py
@@ -130,9 +130,11 @@ def iter_pydantic_models(annotation: Any) -> 
Iterator[type]:
 
     Handles a bare model class, ``Optional`` / ``Union`` of models, and
     parameterized containers such as ``list[MyModel]`` -- the shapes accepted 
as
-    an operator ``output_type``. The agent (or operator) may emit an instance 
of
-    any model reachable from the annotation, so each must be registered for 
XCom
-    deserialization, not just the top-level type.
+    an operator ``output_type`` -- and recurses into a model's own fields so
+    models nested inside it (e.g. ``SubQuestion`` reachable via
+    ``DecomposedQuestion.sub_questions``) are yielded too. A task may push a
+    nested model on to XCom by itself, so each reachable model must be 
registered
+    for deserialization, not just the top-level type.
     """
     seen: set[Any] = set()
     stack: list[Any] = [annotation]
@@ -151,6 +153,13 @@ def iter_pydantic_models(annotation: Any) -> 
Iterator[type]:
             seen.add(tp)
             if is_pydantic_model(tp):
                 yield tp
+                # A model's fields may reference further models; walk them so a
+                # value nested inside the declared type is deserializable too.
+                # ``seen`` makes self-referential models terminate. ``getattr``
+                # because ``is_pydantic_model`` guarantees ``model_fields`` 
exists
+                # but does not narrow ``tp`` (typed ``type``) for the type 
checker.
+                for field in getattr(tp, "model_fields", {}).values():
+                    stack.append(field.annotation)
 
 
 def decode(d: dict[str, Any]) -> tuple[str, int, Any]:
diff --git a/task-sdk/tests/task_sdk/serde/test_serde.py 
b/task-sdk/tests/task_sdk/serde/test_serde.py
index bf41f4e3ed4..16d6851b40b 100644
--- a/task-sdk/tests/task_sdk/serde/test_serde.py
+++ b/task-sdk/tests/task_sdk/serde/test_serde.py
@@ -199,6 +199,27 @@ class C:
         return None
 
 
+class NestedLeaf(BaseModel):
+    value: int
+
+
+class NestedMid(BaseModel):
+    leaves: list[NestedLeaf]
+
+
+class NestedRoot(BaseModel):
+    mid: NestedMid
+    maybe: NestedLeaf | None
+
+
+class SelfRefNode(BaseModel):
+    name: str
+    children: list[SelfRefNode] = []
+
+
+SelfRefNode.model_rebuild()
+
+
 @pytest.mark.usefixtures("recalculate_patterns")
 class TestSerDe:
     def test_ser_primitives(self):
@@ -475,6 +496,16 @@ class TestSerDe:
         assert set(iter_pydantic_models(str)) == set()
         assert set(iter_pydantic_models(list[int])) == set()
 
+    def test_iter_pydantic_models_recurses_into_fields(self):
+        """Models nested inside a model's fields are yielded, not just the 
top-level type."""
+        assert set(iter_pydantic_models(NestedRoot)) == {NestedRoot, 
NestedMid, NestedLeaf}
+        # Holds when the declared type is a container of the root model.
+        assert set(iter_pydantic_models(list[NestedRoot])) == {NestedRoot, 
NestedMid, NestedLeaf}
+
+    def test_iter_pydantic_models_terminates_on_self_reference(self):
+        """A self-referential model must not loop forever."""
+        assert set(iter_pydantic_models(SelfRefNode)) == {SelfRefNode}
+
     def test_incompatible_version(self):
         data = dict(
             {

Reply via email to