This is an automated email from the ASF dual-hosted git repository.
kaxil pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new fbd24b5720b Register nested Pydantic models for XCom deserialization
(#67932)
fbd24b5720b is described below
commit fbd24b5720bf70c14acd286e4e914f4e5eb78888
Author: Kaxil Naik <[email protected]>
AuthorDate: Wed Jun 3 15:31:51 2026 +0100
Register nested Pydantic models for XCom deserialization (#67932)
The worker-side walk that registers an operator's structured-output
classes (output_type) for XCom deserialization only registered the
top-level type. iter_pydantic_models walked the annotation shape
(Optional/Union/list) but never a model's own fields, so a model nested
inside the declared type -- e.g. SubQuestion reachable via
DecomposedQuestion.sub_questions -- was never allow-listed. A task that
pushed such a nested model to XCom then failed to deserialize it with
'... was not found in allow list for deserialization imports'.
Recurse into a model's fields so every reachable model is yielded and
registered. The existing seen set makes self-referential and mutually
recursive model graphs terminate.
---
task-sdk/src/airflow/sdk/serde/__init__.py | 15 +++++++++++---
task-sdk/tests/task_sdk/serde/test_serde.py | 31 +++++++++++++++++++++++++++++
2 files changed, 43 insertions(+), 3 deletions(-)
diff --git a/task-sdk/src/airflow/sdk/serde/__init__.py
b/task-sdk/src/airflow/sdk/serde/__init__.py
index 028d1d391cf..0d94ebc3282 100644
--- a/task-sdk/src/airflow/sdk/serde/__init__.py
+++ b/task-sdk/src/airflow/sdk/serde/__init__.py
@@ -130,9 +130,11 @@ def iter_pydantic_models(annotation: Any) ->
Iterator[type]:
Handles a bare model class, ``Optional`` / ``Union`` of models, and
parameterized containers such as ``list[MyModel]`` -- the shapes accepted
as
- an operator ``output_type``. The agent (or operator) may emit an instance
of
- any model reachable from the annotation, so each must be registered for
XCom
- deserialization, not just the top-level type.
+ an operator ``output_type`` -- and recurses into a model's own fields so
+ models nested inside it (e.g. ``SubQuestion`` reachable via
+ ``DecomposedQuestion.sub_questions``) are yielded too. A task may push a
+ nested model on to XCom by itself, so each reachable model must be
registered
+ for deserialization, not just the top-level type.
"""
seen: set[Any] = set()
stack: list[Any] = [annotation]
@@ -151,6 +153,13 @@ def iter_pydantic_models(annotation: Any) ->
Iterator[type]:
seen.add(tp)
if is_pydantic_model(tp):
yield tp
+ # A model's fields may reference further models; walk them so a
+ # value nested inside the declared type is deserializable too.
+ # ``seen`` makes self-referential models terminate. ``getattr``
+ # because ``is_pydantic_model`` guarantees ``model_fields``
exists
+ # but does not narrow ``tp`` (typed ``type``) for the type
checker.
+ for field in getattr(tp, "model_fields", {}).values():
+ stack.append(field.annotation)
def decode(d: dict[str, Any]) -> tuple[str, int, Any]:
diff --git a/task-sdk/tests/task_sdk/serde/test_serde.py
b/task-sdk/tests/task_sdk/serde/test_serde.py
index bf41f4e3ed4..16d6851b40b 100644
--- a/task-sdk/tests/task_sdk/serde/test_serde.py
+++ b/task-sdk/tests/task_sdk/serde/test_serde.py
@@ -199,6 +199,27 @@ class C:
return None
+class NestedLeaf(BaseModel):
+ value: int
+
+
+class NestedMid(BaseModel):
+ leaves: list[NestedLeaf]
+
+
+class NestedRoot(BaseModel):
+ mid: NestedMid
+ maybe: NestedLeaf | None
+
+
+class SelfRefNode(BaseModel):
+ name: str
+ children: list[SelfRefNode] = []
+
+
+SelfRefNode.model_rebuild()
+
+
@pytest.mark.usefixtures("recalculate_patterns")
class TestSerDe:
def test_ser_primitives(self):
@@ -475,6 +496,16 @@ class TestSerDe:
assert set(iter_pydantic_models(str)) == set()
assert set(iter_pydantic_models(list[int])) == set()
+ def test_iter_pydantic_models_recurses_into_fields(self):
+ """Models nested inside a model's fields are yielded, not just the
top-level type."""
+ assert set(iter_pydantic_models(NestedRoot)) == {NestedRoot,
NestedMid, NestedLeaf}
+ # Holds when the declared type is a container of the root model.
+ assert set(iter_pydantic_models(list[NestedRoot])) == {NestedRoot,
NestedMid, NestedLeaf}
+
+ def test_iter_pydantic_models_terminates_on_self_reference(self):
+ """A self-referential model must not loop forever."""
+ assert set(iter_pydantic_models(SelfRefNode)) == {SelfRefNode}
+
def test_incompatible_version(self):
data = dict(
{