This is an automated email from the ASF dual-hosted git repository.

vatsrahul1001 pushed a commit to branch v3-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v3-2-test by this push:
     new bd425efc1b1 Fix pod_override serialization in DAG details and executor 
path (#65407) (#66898)
bd425efc1b1 is described below

commit bd425efc1b1d2aace12c2ff6f2ff0c8a51a7cdf0
Author: Rahul Vats <[email protected]>
AuthorDate: Fri May 15 10:49:04 2026 +0530

    Fix pod_override serialization in DAG details and executor path (#65407) 
(#66898)
    
    Sanitize default_args["executor_config"]["pod_override"] in DAG
    details responses without changing other default_args values,
    and make V1Pod serialization force the Kubernetes import path
    so executor config no longer falls back to stringification.
    
    This fixes two related Kubernetes serialization bugs: one where
    the DAG details API could fail when pod_override was present in
    default_args, and another where V1Pod objects could be flattened
    into strings before reaching the Kubernetes executor. The updated
    tests cover both the API response behavior and the serializer
    regression with real Kubernetes pod objects.
    
    (cherry picked from commit ff15983262376fb375a67e23ecbc5c2a32786fa0)
    
    Co-authored-by: Ephraim Anierobi <[email protected]>
---
 .../api_fastapi/core_api/datamodels/dags.py        | 37 +++++++++
 .../airflow/serialization/serialized_objects.py    |  6 +-
 .../api_fastapi/core_api/datamodels/test_dags.py   | 95 ++++++++++++++++++++++
 .../unit/serialization/test_serialized_objects.py  | 23 ++++++
 4 files changed, 160 insertions(+), 1 deletion(-)

diff --git a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dags.py 
b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dags.py
index ea73f1fca13..f98ced89f02 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dags.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dags.py
@@ -33,6 +33,7 @@ from pydantic import (
     field_validator,
 )
 
+from airflow._shared.module_loading import qualname
 from airflow.api_fastapi.core_api.base import BaseModel, StrictBaseModel, 
make_partial_model
 from airflow.api_fastapi.core_api.datamodels.dag_tags import DagTagResponse
 from airflow.api_fastapi.core_api.datamodels.dag_versions import 
DagVersionResponse
@@ -44,6 +45,11 @@ if TYPE_CHECKING:
     from airflow.serialization.definitions.param import SerializedParamsDict
 
 
+def _is_response_safe_pod_override(value: Any) -> bool:
+    """Whether a pod_override value is already safe to preserve in the 
response."""
+    return value is None or isinstance(value, str | int | float | Mapping | 
list)
+
+
 @cache
 def _get_file_token_serializer() -> URLSafeSerializer:
     """
@@ -204,6 +210,37 @@ class DAGDetailsResponse(DAGResponse):
             return None
         return inspect.cleandoc(doc_md)
 
+    @field_validator("default_args", mode="before")
+    @classmethod
+    def get_default_args(cls, default_args: Mapping | None) -> Mapping | None:
+        """
+        Sanitize default_args for the API response.
+
+        Targets the common case where ``executor_config["pod_override"]`` is a
+        Kubernetes ``V1Pod``: when the value is not a JSON primitive
+        (``None``/``str``/``int``/``float``) or a ``Mapping``/``list``, it is
+        rewritten to a fully-qualified type-name string so the response stays
+        valid JSON. The container check is shallow — a ``Mapping`` or ``list``
+        whose contents are themselves non-serializable (e.g. nested ``V1Pod``)
+        will still raise during response serialization, as will any other
+        non-JSON values elsewhere in ``default_args``.
+        """
+        if default_args is None:
+            return None
+        executor_config = default_args.get("executor_config")
+        if not (isinstance(executor_config, Mapping) and "pod_override" in 
executor_config):
+            return default_args
+
+        pod_override = executor_config["pod_override"]
+        if _is_response_safe_pod_override(pod_override):
+            return default_args
+
+        sanitized_executor_config = dict(executor_config)
+        sanitized_executor_config["pod_override"] = qualname(pod_override)
+        result = dict(default_args)
+        result["executor_config"] = sanitized_executor_config
+        return result
+
     @field_validator("params", mode="before")
     @classmethod
     def get_params(cls, params: SerializedParamsDict | None) -> dict | None:
diff --git a/airflow-core/src/airflow/serialization/serialized_objects.py 
b/airflow-core/src/airflow/serialization/serialized_objects.py
index 9a0c7ab11d7..a0295dc2370 100644
--- a/airflow-core/src/airflow/serialization/serialized_objects.py
+++ b/airflow-core/src/airflow/serialization/serialized_objects.py
@@ -491,7 +491,11 @@ class BaseSerialization:
             )
         elif isinstance(var, list):
             return [cls.serialize(v, strict=strict) for v in var]
-        elif var.__class__.__name__ == "V1Pod" and _has_kubernetes() and 
isinstance(var, k8s.V1Pod):
+        elif (
+            var.__class__.__name__ == "V1Pod"
+            and _has_kubernetes(attempt_import=True)
+            and isinstance(var, k8s.V1Pod)
+        ):
             json_pod = PodGenerator.serialize_pod(var)
             return cls._encode(json_pod, type_=DAT.POD)
         elif isinstance(var, OutletEventAccessors):
diff --git 
a/airflow-core/tests/unit/api_fastapi/core_api/datamodels/test_dags.py 
b/airflow-core/tests/unit/api_fastapi/core_api/datamodels/test_dags.py
new file mode 100644
index 00000000000..5e9a226d7c4
--- /dev/null
+++ b/airflow-core/tests/unit/api_fastapi/core_api/datamodels/test_dags.py
@@ -0,0 +1,95 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from datetime import datetime, timedelta, timezone
+
+import pytest
+
+from airflow._shared.module_loading import qualname
+from airflow.api_fastapi.core_api.datamodels.dags import DAGDetailsResponse
+
+
+class TestGetDefaultArgsValidator:
+    """Test the get_default_args field_validator on DAGDetailsResponse."""
+
+    def _call_validator(self, value):
+        """Invoke the classmethod validator directly."""
+        return DAGDetailsResponse.get_default_args(value)
+
+    def test_none_returns_none(self):
+        assert self._call_validator(None) is None
+
+    def test_plain_dict_is_preserved(self):
+        result = self._call_validator({"retries": 3, "depends_on_past": False})
+        assert result == {"retries": 3, "depends_on_past": False}
+
+    def test_timedelta_values_are_preserved(self):
+        td = timedelta(minutes=5)
+        result = self._call_validator({"retry_delay": td})
+        assert result == {"retry_delay": td}
+
+    def test_datetime_values_are_preserved(self):
+        start_date = datetime(2024, 1, 1, tzinfo=timezone.utc)
+        result = self._call_validator({"start_date": start_date})
+        assert result == {"start_date": start_date}
+
+    def test_pod_override_is_replaced_with_type_name(self):
+        k8s = pytest.importorskip("kubernetes.client.models")
+        pod = k8s.V1Pod(metadata=k8s.V1ObjectMeta(name="test-pod"))
+        result = self._call_validator({"executor_config": {"pod_override": 
pod, "namespace": "custom"}})
+        assert result == {"executor_config": {"pod_override": qualname(pod), 
"namespace": "custom"}}
+
+    @pytest.mark.parametrize(
+        "pod_override",
+        [
+            pytest.param(None, id="none"),
+            pytest.param("already-serialized", id="string"),
+            pytest.param({"metadata": {"name": "pod"}}, id="dict"),
+            pytest.param([{"metadata": {"name": "pod"}}], id="list"),
+        ],
+    )
+    def test_serialized_pod_override_values_are_preserved(self, pod_override):
+        result = self._call_validator({"executor_config": {"pod_override": 
pod_override}})
+        assert result == {"executor_config": {"pod_override": pod_override}}
+
+    def 
test_serialized_pod_override_preserves_other_executor_config_keys(self):
+        executor_config = {
+            "pod_override": {"metadata": {"name": "pod"}},
+            "KubernetesExecutor": {"image": "custom-image"},
+        }
+
+        result = self._call_validator({"executor_config": executor_config})
+
+        assert result == {"executor_config": executor_config}
+
+    def 
test_non_serialized_pod_override_object_is_replaced_with_type_name(self):
+        class Opaque:
+            pass
+
+        value = Opaque()
+        result = self._call_validator({"executor_config": {"pod_override": 
value}})
+        assert result == {"executor_config": {"pod_override": qualname(value)}}
+
+    def test_non_pod_override_objects_are_left_unchanged(self):
+        class Opaque:
+            def to_dict(self):
+                return {"password": "secret"}
+
+        value = Opaque()
+        result = self._call_validator({"connection": value})
+        assert result["connection"] is value
diff --git a/airflow-core/tests/unit/serialization/test_serialized_objects.py 
b/airflow-core/tests/unit/serialization/test_serialized_objects.py
index f59939be9b2..c117561273b 100644
--- a/airflow-core/tests/unit/serialization/test_serialized_objects.py
+++ b/airflow-core/tests/unit/serialization/test_serialized_objects.py
@@ -86,6 +86,7 @@ from airflow.sdk.definitions.operator_resources import 
Resources
 from airflow.sdk.definitions.param import Param
 from airflow.sdk.definitions.taskgroup import TaskGroup
 from airflow.sdk.execution_time.context import OutletEventAccessor, 
OutletEventAccessors
+from airflow.serialization import serialized_objects
 from airflow.serialization.definitions.assets import (
     SerializedAsset,
     SerializedAssetAlias,
@@ -1049,3 +1050,25 @@ class TestKubernetesImportAvoidance:
         result = _has_kubernetes()
 
         assert result is True
+
+    def test_serialize_v1pod_attempts_import_before_serializing(self, 
monkeypatch):
+        """Regression test: V1Pod serialization must call 
_has_kubernetes(attempt_import=True)."""
+        k8s = pytest.importorskip("kubernetes.client.models")
+        from airflow.providers.cncf.kubernetes.pod_generator import 
PodGenerator
+
+        calls = []
+
+        def fake_has_kubernetes(*, attempt_import=False):
+            calls.append(attempt_import)
+            return True
+
+        monkeypatch.setattr(serialized_objects, "_has_kubernetes", 
fake_has_kubernetes)
+        monkeypatch.setattr(serialized_objects, "k8s", k8s, raising=False)
+        monkeypatch.setattr(serialized_objects, "PodGenerator", PodGenerator, 
raising=False)
+
+        pod = k8s.V1Pod(metadata=k8s.V1ObjectMeta(name="test-pod"))
+        result = BaseSerialization.serialize(pod)
+
+        assert isinstance(result, dict), "V1Pod should serialize to a dict, 
not a string"
+        assert result.get(Encoding.TYPE) == DAT.POD, "V1Pod should have type 
DAT.POD"
+        assert True in calls

Reply via email to