This is an automated email from the ASF dual-hosted git repository.

ephraimbuddy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 90051561e72 fix serialize_template_field handling callable value in 
dict (#63871)
90051561e72 is described below

commit 90051561e721d24834f79d5e1335708c671a9be9
Author: Jeongwoo Do <[email protected]>
AuthorDate: Fri May 15 03:17:24 2026 +0900

    fix serialize_template_field handling callable value in dict (#63871)
    
    * Fix non-deterministic serialization of non-jsonable objects in template 
fields
    
    * fix logic
    
    * fix logic
    
    * fix logic
    
    * fix logic
---
 airflow-core/src/airflow/serialization/helpers.py  | 123 +++--
 .../tests/unit/dags/test_dag_decorator_version.py  |  63 +++
 .../tests/unit/models/test_renderedtifields.py     |   4 +-
 .../unit/serialization/test_dag_serialization.py   |  39 ++
 .../tests/unit/serialization/test_helpers.py       | 606 +++++++++++++++++++++
 .../src/airflow/sdk/execution_time/task_runner.py  | 134 ++---
 .../task_sdk/execution_time/test_task_runner.py    |  13 +-
 7 files changed, 855 insertions(+), 127 deletions(-)

diff --git a/airflow-core/src/airflow/serialization/helpers.py 
b/airflow-core/src/airflow/serialization/helpers.py
index e2c8069a116..83b57d1c7cc 100644
--- a/airflow-core/src/airflow/serialization/helpers.py
+++ b/airflow-core/src/airflow/serialization/helpers.py
@@ -19,87 +19,96 @@
 from __future__ import annotations
 
 import contextlib
+import inspect
 from typing import TYPE_CHECKING, Any
 
 from airflow._shared.module_loading import qualname
 from airflow._shared.secrets_masker import redact
 from airflow._shared.template_rendering import truncate_rendered_value
 from airflow.configuration import conf
-from airflow.settings import json
 
 if TYPE_CHECKING:
     from airflow.partition_mappers.base import PartitionMapper
     from airflow.timetables.base import Timetable as CoreTimetable
 
 
-def serialize_template_field(template_field: Any, name: str) -> str | dict | 
list | int | float:
+def serialize_template_field(template_field: Any, name: str) -> str | dict | 
list | int | float | bool | None:
     """
     Return a serializable representation of the templated field.
 
-    If ``templated_field`` is provided via a callable then
-    return the following serialized value: ``<callable full_qualified_name>``
+    The walk has two responsibilities:
 
-    If ``templated_field`` contains a class or instance that requires recursive
-    templating, store them as strings. Otherwise simply return the field as-is.
+    1. **Make the template_field JSON-encodable** — every container is rebuilt
+       with primitive leaves (str/int/float/bool/None), tuples and sets are
+       flattened to lists, and unsupported objects fall through to ``str()``
+       so ``json.dumps`` never raises on the result.
+    2. **Keep the output deterministic across parses** — callables are replaced
+       with their qualified name (never the default ``<function ... at 0x...>``
+       repr), dicts are key-sorted, and (frozen)sets are sorted by element so
+       the same input always produces the same string.
     """
 
-    def is_jsonable(x):
-        try:
-            json.dumps(x)
-        except (TypeError, OverflowError):
-            return False
-        else:
-            return True
-
-    def translate_tuples_to_lists(obj: Any):
-        """Recursively convert tuples to lists."""
-        if isinstance(obj, tuple):
-            return [translate_tuples_to_lists(item) for item in obj]
-        if isinstance(obj, list):
-            return [translate_tuples_to_lists(item) for item in obj]
-        if isinstance(obj, dict):
-            return {key: translate_tuples_to_lists(value) for key, value in 
obj.items()}
-        return obj
+    def normalize_dict_key(key) -> str:
+        """Normalize a dict key to a serialized string type."""
+        # Serialized template_field keys must all be strings, not a mix of 
types, so that
+        # downstream json.dumps(..., sort_keys=True) does not raise on 
mixed-type keys.
+        return str(serialize_object(key))
+
+    def serialize_object(obj):
+        """Recursively rewrite ``obj`` into a JSON-encodable, hash-stable 
structure."""
+        if obj is None or isinstance(obj, (str, int, float, bool)):
+            return obj
 
-    def sort_dict_recursively(obj: Any) -> Any:
-        """Recursively sort dictionaries to ensure consistent ordering."""
         if isinstance(obj, dict):
-            return {k: sort_dict_recursively(v) for k, v in 
sorted(obj.items())}
-        if isinstance(obj, list):
-            return [sort_dict_recursively(item) for item in obj]
-        if isinstance(obj, tuple):
-            return tuple(sort_dict_recursively(item) for item in obj)
-        return obj
+            # Serialize keys/values first so each key is a string and the 
output is hash-stable,
+            # then sort by the serialized key to prevent hash inconsistencies 
when dict ordering varies.
+            serialized_pairs = [(normalize_dict_key(k), serialize_object(v)) 
for k, v in obj.items()]
+            return dict(sorted(serialized_pairs, key=lambda kv: kv[0]))
+
+        if isinstance(obj, (list, tuple)):
+            return [serialize_object(item) for item in obj]
+
+        if isinstance(obj, (set, frozenset)):
+            # JSON has no set type → flatten to a list with deterministic 
ordering
+            # so hash randomization on element types cannot shift 
cross-process iteration order.
+            serialized_set = [serialize_object(e) for e in obj]
+            return sorted(serialized_set, key=lambda x: (type(x).__name__, 
str(x)))
+
+        # Use inspect.getattr_static to bypass any custom __getattr__ / 
metaclass magic
+        if callable(inspect.getattr_static(obj, "serialize", None)):
+            return serialize_object(obj.serialize())
+
+        # Kubernetes client objects (V1Pod, V1Container, ...) expose their 
content via to_dict().
+        # Scope the branch to the kubernetes namespace so unrelated user 
classes that happen to
+        # define a to_dict() method fall through to str() instead of being 
treated as K8s payloads.
+        if getattr(type(obj), "__module__", "").startswith(
+            ("kubernetes.", "kubernetes_asyncio.")
+        ) and callable(inspect.getattr_static(obj, "to_dict", None)):
+            return serialize_object(obj.to_dict())
+
+        if callable(obj):
+            # Use qualified name; default repr embeds memory addresses, which 
would change the DAG hash on every parse
+            return f"<callable {qualname(obj, True)}>"
+
+        # A custom __str__ or __repr__ is treated as an intentional textual 
representation
+        # supplied by the author and used as-is.
+        if type(obj).__str__ is not object.__str__ or type(obj).__repr__ is 
not object.__repr__:
+            return str(obj)
+
+        # Otherwise fall back to a qualname marker. The default object repr is
+        # `<ClassName object at 0x...>`, which embeds a memory address that 
flips per process
+        # and would break DAG hash stability — use the class qualname instead.
+        return f"<{qualname(type(obj), True)} object>"
 
     max_length = conf.getint("core", "max_templated_field_length")
 
-    if not is_jsonable(template_field):
-        try:
-            serialized = template_field.serialize()
-        except AttributeError:
-            if callable(template_field):
-                full_qualified_name = qualname(template_field, True)
-                serialized = f"<callable {full_qualified_name}>"
-            else:
-                serialized = str(template_field)
-        if len(serialized) > max_length:
-            rendered = redact(serialized, name)
-            return truncate_rendered_value(str(rendered), max_length)
-        return serialized
-    if not template_field and not isinstance(template_field, tuple):
-        # Avoid unnecessary serialization steps for empty fields unless they 
are tuples
-        # and need to be converted to lists
-        return template_field
-    template_field = translate_tuples_to_lists(template_field)
-    # Sort dictionaries recursively to ensure consistent string representation
-    # This prevents hash inconsistencies when dict ordering varies
-    if isinstance(template_field, dict):
-        template_field = sort_dict_recursively(template_field)
-    serialized = str(template_field)
-    if len(serialized) > max_length:
-        rendered = redact(serialized, name)
+    serialized = serialize_object(template_field)
+
+    if len(str(serialized)) > max_length:
+        rendered = redact(str(serialized), name)
         return truncate_rendered_value(str(rendered), max_length)
-    return template_field
+
+    return serialized
 
 
 class TimetableNotRegistered(ValueError):
diff --git a/airflow-core/tests/unit/dags/test_dag_decorator_version.py 
b/airflow-core/tests/unit/dags/test_dag_decorator_version.py
new file mode 100644
index 00000000000..35fd0c98bb9
--- /dev/null
+++ b/airflow-core/tests/unit/dags/test_dag_decorator_version.py
@@ -0,0 +1,63 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from datetime import datetime
+
+from airflow.sdk import dag, task, task_group
+
+
+@dag(
+    dag_id="TEST_DTM",
+    dag_display_name="TEST DTM",
+    schedule=None,
+    default_args={"owner": "airflow", "email": ""},
+    start_date=datetime(2024, 1, 25),
+)
+def dtm_test(
+    exponent: int = 2,
+):
+
+    @task
+    def get_data():
+        return [20, 100, 200, 222, 242, 272]
+
+    @task
+    def to_exp(number: int, exponent: int) -> float:
+        return number**exponent
+
+    @task
+    def trunc(number: float, digits: int) -> float:
+        return round(number / 22, digits)
+
+    @task
+    def save(number: list[float]):
+        for n in number:
+            print(f"Got number: {n}")
+
+    @task_group  # type: ignore[type-var]
+    def transform(number: int, exponent: int) -> float:
+        a = to_exp(number, exponent)
+        b = trunc(a, 2)
+        return b
+
+    data = get_data()
+    result = transform.partial(exponent=exponent).expand(number=data)
+    save(result)  # type: ignore[arg-type]
+
+
+instance = dtm_test()
diff --git a/airflow-core/tests/unit/models/test_renderedtifields.py 
b/airflow-core/tests/unit/models/test_renderedtifields.py
index d42ed06b033..37e6088494d 100644
--- a/airflow-core/tests/unit/models/test_renderedtifields.py
+++ b/airflow-core/tests/unit/models/test_renderedtifields.py
@@ -116,11 +116,11 @@ class TestRenderedTaskInstanceFields:
             pytest.param([], [], id="list"),
             pytest.param({}, {}, id="empty_dict"),
             pytest.param((), [], id="empty_tuple"),
-            pytest.param(set(), "set()", id="empty_set"),
+            pytest.param(set(), [], id="empty_set"),
             pytest.param("test-string", "test-string", id="string"),
             pytest.param({"foo": "bar"}, {"foo": "bar"}, id="dict"),
             pytest.param(("foo", "bar"), ["foo", "bar"], id="tuple"),
-            pytest.param({"foo"}, "{'foo'}", id="set"),
+            pytest.param({"foo"}, ["foo"], id="set"),
             (date(2018, 12, 6), "2018-12-06"),
             pytest.param(datetime(2018, 12, 6, 10, 55), "2018-12-06 
10:55:00+00:00", id="datetime"),
             pytest.param(
diff --git a/airflow-core/tests/unit/serialization/test_dag_serialization.py 
b/airflow-core/tests/unit/serialization/test_dag_serialization.py
index fd333d5d799..0a40f67314e 100644
--- a/airflow-core/tests/unit/serialization/test_dag_serialization.py
+++ b/airflow-core/tests/unit/serialization/test_dag_serialization.py
@@ -82,6 +82,7 @@ from airflow.serialization.json_schema import 
load_dag_schema_dict
 from airflow.serialization.serialized_objects import (
     BaseSerialization,
     DagSerialization,
+    LazyDeserializedDAG,
     OperatorSerialization,
     _XComRef,
 )
@@ -114,6 +115,7 @@ from tests_common.test_utils.timetables import (
     cron_timetable,
     delta_timetable,
 )
+from unit.models import TEST_DAGS_FOLDER
 
 if TYPE_CHECKING:
     from airflow.sdk.definitions.context import Context
@@ -704,6 +706,43 @@ class TestStringifiedDAGs:
         for dag_id in stringified_dags:
             self.validate_deserialized_dag(stringified_dags[dag_id], 
dags[dag_id])
 
+    @pytest.mark.db_test
+    @conf_vars({("core", "load_examples"): "false"})
+    def test_reserialize_should_make_equal_hash_with_dag_processor(self):
+        dagbag1 = DagBag(TEST_DAGS_FOLDER / "test_dag_decorator_version.py")
+        hash_result1 = 
LazyDeserializedDAG.from_dag(next(iter(dagbag1.dags.values()))).hash
+
+        dagbag2 = DagBag(TEST_DAGS_FOLDER / "test_dag_decorator_version.py")
+        hash_result2 = 
LazyDeserializedDAG.from_dag(next(iter(dagbag2.dags.values()))).hash
+
+        assert hash_result1 == hash_result2
+
+    @pytest.mark.db_test
+    @conf_vars({("core", "load_examples"): "false"})
+    def 
test_hash_succeeds_for_dag_with_mixed_primitive_key_template_field(self):
+        """SerializedDagModel.hash() must not raise on a template field whose 
dict has mixed-type primitive keys.
+
+        Building the Dag twice via ``create_dag()`` produces independent Dag 
and
+        operator instances, so the hashes must also be equal across calls —
+        otherwise the serialization path is leaking non-deterministic state
+        (memory addresses, dict ordering, etc.) into the hash.
+        """
+        from airflow.providers.standard.operators.python import PythonOperator
+
+        def create_dag():
+            with DAG(dag_id="dag_mixed_keys", schedule=None, 
start_date=datetime(2024, 1, 1)) as dag:
+                PythonOperator(
+                    task_id="op",
+                    python_callable=empty_function,
+                    op_kwargs={"data": {1: "a", "b": "c", None: "z", 2: "d"}, 
empty_function: "t"},
+                )
+            return dag
+
+        first_hash = LazyDeserializedDAG.from_dag(create_dag()).hash
+        second_hash = LazyDeserializedDAG.from_dag(create_dag()).hash
+
+        assert first_hash == second_hash
+
     @skip_if_force_lowest_dependencies_marker
     @pytest.mark.db_test
     def test_roundtrip_provider_example_dags(self):
diff --git a/airflow-core/tests/unit/serialization/test_helpers.py 
b/airflow-core/tests/unit/serialization/test_helpers.py
index 94fb26525c7..0dbd70fd747 100644
--- a/airflow-core/tests/unit/serialization/test_helpers.py
+++ b/airflow-core/tests/unit/serialization/test_helpers.py
@@ -16,6 +16,10 @@
 # under the License.
 from __future__ import annotations
 
+import json
+
+import pytest
+
 from airflow.sdk.definitions._internal.types import SET_DURING_EXECUTION
 from airflow.serialization.definitions.notset import NOTSET
 from airflow.serialization.helpers import serialize_template_field
@@ -33,6 +37,16 @@ def 
test_serialize_template_field_with_very_small_max_length(monkeypatch):
     assert "Truncated. You can change this behaviour" in result
 
 
+def test_serialize_template_field_truncation_kicks_in(monkeypatch):
+    """Long serialized output must be truncated with the standard message."""
+    monkeypatch.setenv("AIRFLOW__CORE__MAX_TEMPLATED_FIELD_LENGTH", "20")
+
+    long_value = {"k": "x" * 500}
+    result = serialize_template_field(long_value, "field")
+
+    assert "Truncated. You can change this behaviour" in result
+
+
 def test_serialize_template_field_with_notset():
     """NOTSET must serialize deterministically via serialize(), not str() 
fallback."""
     result = serialize_template_field(NOTSET, "logical_date")
@@ -51,3 +65,595 @@ def test_argnotset_repr_and_str():
     assert str(NOTSET) == "NOTSET"
     assert repr(SET_DURING_EXECUTION) == "DYNAMIC (set during execution)"
     assert str(SET_DURING_EXECUTION) == "DYNAMIC (set during execution)"
+
+
+def test_serialize_template_field_with_dict_value_callable():
+
+    def fn_returns_callable():
+        def get_arg():
+            pass
+
+        return get_arg
+
+    template_name = "op_kwargs"
+
+    def make_value():
+        return {"values": [3, 1, 2], "sort_key": lambda x: x}
+
+    result1 = serialize_template_field(make_value(), template_name)
+    result2 = serialize_template_field(make_value(), template_name)
+
+    assert result1 == result2
+
+    def make_value_nested():
+        return {
+            "values": [3, 1, 2],
+            "sort_key_nested": {"b": lambda x: fn_returns_callable(), "a": 
"test"},
+        }
+
+    result1_nested = serialize_template_field(make_value_nested(), 
template_name)
+    result2_nested = serialize_template_field(make_value_nested(), 
template_name)
+
+    assert result1_nested == result2_nested
+
+
+def test_serialize_template_field_with_mixed_key_dict_and_callable():
+    """Mixed-key dicts containing callables must serialize deterministically 
without TypeError."""
+    template_name = "op_kwargs"
+
+    def make_value():
+        return {1: "a", "b": lambda x: x, 2: "c"}
+
+    result1 = serialize_template_field(make_value(), template_name)
+    result2 = serialize_template_field(make_value(), template_name)
+
+    assert result1 == result2
+    assert any(isinstance(v, str) and "<callable " in v for v in 
result1.values())
+
+
+def test_serialize_template_field_with_mixed_key_jsonable_dict():
+    """Jsonable mixed-key dicts must not raise when sorted for deterministic 
output."""
+    template_name = "op_kwargs"
+
+    def make_value():
+        return {1: "a", "b": "c", 2: "d", 3: True}
+
+    result1 = serialize_template_field(make_value(), template_name)
+    result2 = serialize_template_field(make_value(), template_name)
+
+    assert result1 == result2
+
+
[email protected](
+    "value",
+    [None, "hello", 0, 42, -1, 3.14, True, False],
+    ids=["none", "str", "zero", "int", "neg_int", "float", "true", "false"],
+)
+def test_serialize_template_field_primitives_pass_through(value):
+    """Primitives (None, str, int, float, bool) must be returned unchanged and 
keep their type."""
+    result = serialize_template_field(value, "field")
+    assert result == value
+    assert type(result) is type(value)
+
+
+def test_serialize_template_field_tuple_becomes_list():
+    """Top-level and nested tuples must flatten to lists for JSON 
compatibility."""
+    result = serialize_template_field((1, 2, (3, 4)), "field")
+    assert result == [1, 2, [3, 4]]
+
+
+def test_serialize_template_field_tuple_key_normalized():
+    """Tuple keys must be normalized to a string so the dict stays 
JSON-encodable."""
+    result1 = serialize_template_field({(1, 2): "v", (3, 4): "w"}, "op_kwargs")
+    result2 = serialize_template_field({(3, 4): "w", (1, 2): "v"}, "op_kwargs")
+
+    assert result1 == result2
+    assert all(isinstance(k, str) for k in result1)
+    json.dumps(result1)  # must not raise
+
+
+def test_serialize_template_field_frozenset_key_normalized():
+    """Frozenset keys must be normalized to a string."""
+    result = serialize_template_field({frozenset([1, 2]): "v"}, "op_kwargs")
+    assert isinstance(next(iter(result)), str)
+    json.dumps(result)
+
+
+def test_serialize_template_field_callable_key_uses_qualname():
+    """Callable keys must serialize via qualname so memory addresses don't 
leak into the hash."""
+
+    def my_fn():
+        pass
+
+    result = serialize_template_field({my_fn: "v"}, "op_kwargs")
+    key = next(iter(result))
+    assert key.startswith("<callable ")
+    assert "my_fn" in key
+    assert "at 0x" not in key
+
+
+def test_serialize_template_field_mixed_exotic_keys_deterministic():
+    """A dict with str, int, tuple, and callable keys must serialize the same 
way every call."""
+
+    def my_fn():
+        pass
+
+    def make_value():
+        return {"a": 1, 2: "b", (3, 4): "c", my_fn: "d"}
+
+    r1 = serialize_template_field(make_value(), "op_kwargs")
+    r2 = serialize_template_field(make_value(), "op_kwargs")
+    assert r1 == r2
+    json.dumps(r1)
+
+
+def test_serialize_template_field_object_with_serialize_method():
+    """An object exposing serialize() must use it (recursively) instead of 
str()."""
+
+    class Custom:
+        def serialize(self):
+            return {"kind": "custom", "values": (1, 2, 3)}
+
+    result = serialize_template_field(Custom(), "field")
+    assert result == {"kind": "custom", "values": [1, 2, 3]}
+
+
+def test_serialize_template_field_object_with_getattr_no_serialize():
+    """Objects with custom __getattr__ but no real serialize attribute must 
fall through to str()."""
+
+    class Tricky:
+        def __getattr__(self, item):
+            # Mimic SQLAlchemy / proxy objects that return *something* for any 
attribute access
+            return lambda *a, **kw: "should-not-be-called"
+
+        def __str__(self):
+            return "tricky-object"
+
+    result = serialize_template_field(Tricky(), "field")
+    assert result == "tricky-object"
+
+
+def 
test_serialize_template_field_non_kubernetes_to_dict_falls_through_to_str():
+    """User classes that happen to define to_dict() must not be treated as K8s 
payloads."""
+
+    class CustomWithToDict:
+        def to_dict(self):
+            return {"should": "not be used"}
+
+        def __str__(self):
+            return "custom-via-str"
+
+    result = serialize_template_field(CustomWithToDict(), "field")
+    assert result == "custom-via-str"
+
+
+def test_serialize_template_field_kubernetes_object_uses_to_dict():
+    """Objects whose class is defined under the kubernetes.* namespace are 
normalized via to_dict()."""
+
+    class FakeK8sObject:
+        def to_dict(self):
+            return {"kind": "Pod", "metadata": {"name": "test"}}
+
+    FakeK8sObject.__module__ = "kubernetes.client.models.v1_pod"
+
+    result = serialize_template_field(FakeK8sObject(), "field")
+    assert result == {"kind": "Pod", "metadata": {"name": "test"}}
+
+
+def test_serialize_template_field_bytes_become_str():
+    """Bytes are not JSON-encodable; they must be coerced via str()."""
+    result = serialize_template_field(b"binary", "field")
+    assert isinstance(result, str)
+
+
+def test_serialize_template_field_no_memory_address_in_output():
+    """Output must never contain `<function ... at 0x...>` repr leaks (which 
would break DAG hashing)."""
+
+    def my_fn():
+        pass
+
+    value = {
+        "a": my_fn,
+        "b": [my_fn, {"c": my_fn}],
+        my_fn: "as-key",
+        ("tup",): my_fn,
+    }
+    result = serialize_template_field(value, "op_kwargs")
+    assert "at 0x" not in str(result)
+
+
+def test_serialize_template_field_plain_object_has_no_memory_address():
+    """Objects relying on the default object.__str__ would leak `<ClassName 
object at 0x...>`."""
+
+    class Opaque:
+        pass
+
+    result = serialize_template_field(Opaque(), "field")
+    assert isinstance(result, str)
+    assert "at 0x" not in result
+    assert "Opaque" in result
+
+
+def test_serialize_template_field_plain_object_repr_preserved_when_custom():
+    """A user-defined __repr__ is a meaningful representation and must be kept 
as-is."""
+
+    class WithRepr:
+        def __repr__(self):
+            return "stable-repr"
+
+    result = serialize_template_field(WithRepr(), "field")
+    assert result == "stable-repr"
+
+
+def test_serialize_template_field_set_of_plain_objects_is_deterministic():
+    """Repeated serialization of a set of plain objects must produce identical 
output across calls."""
+
+    class Opaque:
+        pass
+
+    first = serialize_template_field({Opaque(), Opaque()}, "field")
+    second = serialize_template_field({Opaque(), Opaque()}, "field")
+    assert first == second
+    assert "at 0x" not in str(first)
+
+
+def test_serialize_template_field_output_is_jsonable():
+    """Whatever shape we pass in, the result must be directly 
JSON-encodable."""
+
+    def my_fn():
+        pass
+
+    value = {
+        "callable_value": my_fn,
+        "nested": {"list": [1, (2, 3), my_fn], "deep": {("k",): my_fn}},
+        frozenset([1, 2]): [my_fn],
+        my_fn: {"x": 1},
+    }
+    result = serialize_template_field(value, "op_kwargs")
+    json.dumps(result)
+
+
+def test_serialize_template_field_deeply_nested_determinism():
+    """Determinism across new instances of the same nested structure (key 
ordering must not matter)."""
+
+    def my_fn():
+        pass
+
+    def make_a():
+        return {
+            "z": [3, 2, 1],
+            "a": {"nested": my_fn, "items": (1, 2)},
+            10: ("x", "y"),
+        }
+
+    def make_b():
+        # Same content, different insertion order
+        return {
+            10: ("x", "y"),
+            "a": {"items": (1, 2), "nested": my_fn},
+            "z": [3, 2, 1],
+        }
+
+    assert serialize_template_field(make_a(), "f") == 
serialize_template_field(make_b(), "f")
+
+
+def test_serialize_template_field_bool_not_collapsed_to_int():
+    """bool must be preserved as bool (Python treats True == 1, but JSON 
distinguishes them)."""
+    result = serialize_template_field({"flag": True, "count": 1}, "op_kwargs")
+    assert result["flag"] is True
+    assert result["count"] == 1
+    assert type(result["flag"]) is bool
+
+
+def test_serialize_template_field_none_preserved():
+    """None must round-trip as None, not the string 'None'."""
+    result = serialize_template_field({"x": None, "y": [None, 1]}, "op_kwargs")
+    assert result == {"x": None, "y": [None, 1]}
+
+
+def test_serialize_template_field_list_with_callables_and_objects():
+    """Lists must recursively serialize callables and objects without leaking 
repr."""
+
+    def my_fn():
+        pass
+
+    class Custom:
+        def serialize(self):
+            return "custom-serialized"
+
+    result = serialize_template_field([1, my_fn, Custom(), (2, my_fn)], 
"field")
+    assert result[0] == 1
+    assert result[1].startswith("<callable ")
+    assert "my_fn" in result[1]
+    assert result[2] == "custom-serialized"
+    assert result[3][0] == 2
+    assert result[3][1].startswith("<callable ")
+
+
+def 
test_serialize_template_field_key_with_serialize_returning_nested_callable():
+    """A key whose .serialize() returns a structure containing callables must 
not leak memory addresses."""
+
+    def my_fn():
+        pass
+
+    class Custom:
+        def serialize(self):
+            return {"k": my_fn}  # nested callable inside serialize() output
+
+    result = serialize_template_field({Custom(): "v"}, "op_kwargs")
+    assert "at 0x" not in str(result)
+    json.dumps(result)
+
+
+def test_serialize_template_field_key_with_serialize_returning_primitive():
+    """A key whose .serialize() returns a primitive must use that primitive 
directly (no str() wrap)."""
+
+    class Custom:
+        def serialize(self):
+            return "stable-id-v1"
+
+    result = serialize_template_field({Custom(): "v"}, "op_kwargs")
+    assert result == {"stable-id-v1": "v"}
+
+
+def 
test_serialize_template_field_key_with_serialize_returning_list_with_callable():
+    """Sibling case to the dict-with-callable test: list output with nested 
callables must also be cleaned before str()."""
+
+    def my_fn():
+        pass
+
+    class Custom:
+        def serialize(self):
+            return [1, my_fn, (2, my_fn)]
+
+    result1 = serialize_template_field({Custom(): "v"}, "op_kwargs")
+    result2 = serialize_template_field({Custom(): "v"}, "op_kwargs")
+
+    key = next(iter(result1))
+    assert "at 0x" not in key
+    assert "<callable " in key
+    assert result1 == result2
+    json.dumps(result1)
+
+
+def test_serialize_template_field_key_falls_back_to_str_when_no_serialize():
+    """A non-primitive, non-callable key without .serialize() must use str() 
of the original object"""
+
+    class NoSerialize:
+        def __str__(self):
+            return "no-serialize-stringified"
+
+    result = serialize_template_field({NoSerialize(): "v"}, "op_kwargs")
+    assert result == {"no-serialize-stringified": "v"}
+
+
+def 
test_serialize_template_field_set_value_with_callable_no_memory_address_leak():
+    """A set containing a callable must replace the callable via qualname, not 
leak `at 0x...`."""
+
+    def my_fn():
+        pass
+
+    result = serialize_template_field({my_fn}, "op_kwargs")
+
+    assert "at 0x" not in str(result)
+    assert "<callable " in str(result)
+
+
+def 
test_serialize_template_field_frozenset_value_with_callable_no_memory_address_leak():
+    """Same regression as set, but with frozenset as a value."""
+
+    def my_fn():
+        pass
+
+    result = serialize_template_field({"items": frozenset([my_fn])}, 
"op_kwargs")
+
+    assert "at 0x" not in str(result)
+    assert "<callable " in str(result)
+
+
+def 
test_serialize_template_field_frozenset_key_with_callable_member_no_memory_address_leak():
+    """A frozenset key containing a callable must serialize without leaking 
memory addresses."""
+
+    def my_fn():
+        pass
+
+    # frozenset of hashables (functions are hashable) is a valid dict key
+    result = serialize_template_field({frozenset([my_fn]): "v"}, "op_kwargs")
+
+    key = next(iter(result))
+    assert "at 0x" not in key
+    assert "<callable " in key
+
+
+def test_serialize_template_field_set_value_flattens_to_list():
+    """Set must serialize to a JSON-compatible list, not a Python set repr 
string."""
+
+    result = serialize_template_field({"items": {1, 2, 3}}, "op_kwargs")
+
+    assert isinstance(result["items"], list)
+    assert sorted(result["items"]) == [1, 2, 3]
+    json.dumps(result)
+
+
+def test_serialize_template_field_set_of_strings_deterministic_ordering():
+    """Set of strings must serialize with deterministic ordering — not 
affected by PYTHONHASHSEED.
+
+    Sets are walked then sorted by (type_name, str(element)), so the output 
ordering
+    depends on the elements rather than on hash randomization across processes.
+    """
+    # Same content, two independent set instances
+    a = serialize_template_field({"items": {"banana", "apple", "cherry"}}, 
"op_kwargs")
+    b = serialize_template_field({"items": {"cherry", "banana", "apple"}}, 
"op_kwargs")
+
+    assert a == b
+    assert isinstance(a["items"], list)
+    assert a["items"] == sorted(a["items"])
+
+
+def test_serialize_template_field_nested_set_with_callable():
+    """Set nested deep inside a dict/list must still recursively clean 
callables."""
+
+    def my_fn():
+        pass
+
+    value = {"outer": [{"inner": {my_fn, "literal"}}]}
+    result = serialize_template_field(value, "op_kwargs")
+
+    assert "at 0x" not in str(result)
+    json.dumps(result)
+
+
+def test_serialize_template_field_callable_keys_sort_by_qualname_not_address():
+    """Two distinct named callables as dict keys must sort by qualname, not 
memory address.
+
+    Without this guarantee, two semantically-identical inputs that happen to 
allocate
+    the callables in a different order produce different serialized output, 
and re-parsing
+    the same Dag in another process can produce a different hash.
+    """
+
+    def fn_a():
+        pass
+
+    def fn_b():
+        pass
+
+    # Two dicts with the same content but different insertion orders must 
produce
+    # the same output once sorting is keyed on qualname.
+    r1 = serialize_template_field({fn_a: 1, fn_b: 2}, "op_kwargs")
+    r2 = serialize_template_field({fn_b: 2, fn_a: 1}, "op_kwargs")
+
+    assert r1 == r2
+    # The serialized iteration order must follow qualname (fn_a before fn_b),
+    # not memory address.
+    keys = list(r1.keys())
+    assert len(keys) == 2
+    assert "fn_a" in keys[0]
+    assert "fn_b" in keys[1]
+
+
+def test_serialize_template_field_lambda_keys_collapse_deterministically():
+    """Multiple lambdas as keys collapse to one entry deterministically across 
parses.
+
+    Each call to ``make_value()`` produces *new* lambda objects with new memory
+    addresses. The serialized result must not depend on those addresses.
+    """
+
+    def make_value():
+        # Two lambdas; both qualnames are ``<lambda>``, so they collapse to 
the same
+        # serialized key. The assertion below targets stability across calls,
+        # not key preservation between the two lambdas.
+        return {(lambda x: x): "a", (lambda y: y): "b"}
+
+    r1 = serialize_template_field(make_value(), "op_kwargs")
+    r2 = serialize_template_field(make_value(), "op_kwargs")
+
+    assert r1 == r2
+    assert "at 0x" not in str(r1)
+
+
+def 
test_serialize_template_field_dict_with_serializable_keys_sort_by_serialized_form():
+    """Custom objects whose .serialize() returns a stable string must be 
sorted by that string, not by repr."""
+
+    class StableId:
+        def __init__(self, name):
+            self.name = name
+
+        def serialize(self):
+            return self.name
+
+    # Insert in reverse alphabetical order — sorting by serialized form must 
reverse it.
+    r1 = serialize_template_field({StableId("zeta"): 1, StableId("alpha"): 2}, 
"op_kwargs")
+    r2 = serialize_template_field({StableId("alpha"): 2, StableId("zeta"): 1}, 
"op_kwargs")
+
+    assert r1 == r2
+    assert list(r1.keys()) == ["alpha", "zeta"]
+
+
[email protected](
+    ("value", "expected_keys"),
+    [
+        ({1: "a", 2: "b"}, {"1", "2"}),
+        ({True: "a", False: "b"}, {"True", "False"}),
+        ({None: "a"}, {"None"}),
+        ({1.5: "a", 2.5: "b"}, {"1.5", "2.5"}),
+        ({1: "a", "b": "c"}, {"1", "b"}),
+    ],
+    ids=["int_keys", "bool_keys", "none_key", "float_keys", "mixed_int_str"],
+)
+def test_serialize_template_field_primitive_keys_coerced_to_string(value, 
expected_keys):
+    """All dict keys must be coerced to str so json.dumps(sort_keys=True) 
downstream cannot raise."""
+    result = serialize_template_field(value, "op_kwargs")
+    assert set(result.keys()) == expected_keys
+    assert all(isinstance(k, str) for k in result)
+
+
+def test_serialize_template_field_mixed_primitive_keys_jsonable_sort_keys():
+    """Output of mixed-type primitive keys must survive ``json.dumps(..., 
sort_keys=True)``."""
+    value = {1: "a", "b": "c", 2: "d", 3: True, None: "z", False: "y"}
+    result = serialize_template_field(value, "op_kwargs")
+
+    json.dumps(result, sort_keys=True)
+
+
+def 
test_serialize_template_field_mixed_primitive_keys_deterministic_across_calls():
+    """Same input parsed twice must yield identical output once keys are 
stringified."""
+
+    def fn_a():
+        pass
+
+    def fn_b():
+        pass
+
+    def make_value():
+        return {1: "a", "b": "c", 2: "d", None: "z", "test": fn_b, fn_a: 3.5}
+
+    assert serialize_template_field(make_value(), "op_kwargs") == 
serialize_template_field(
+        make_value(), "op_kwargs"
+    )
+
+
+def test_serialize_template_field_nested_mixed_primitive_keys_jsonable():
+    """Nested mixed-type primitive keys (dict inside dict) must also be 
coerced and jsonable."""
+    value = {"outer": {1: "a", "b": "c", None: "z"}}
+    result = serialize_template_field(value, "op_kwargs")
+
+    assert all(isinstance(k, str) for k in result["outer"])
+    json.dumps(result, sort_keys=True)
+
+
+def 
test_serialize_template_field_deeply_nested_dict_keys_recursively_normalized():
+    """Every nested dict must apply key normalization and sorting recursively.
+
+    Mixed-type primitive keys and callable keys appear at multiple depths; the
+    helper must stringify and sort them at each level so the full output is
+    deterministic across calls and safe for ``json.dumps(sort_keys=True)``.
+    """
+
+    def fn_inner():
+        pass
+
+    def make_value():
+        return {
+            "level1": {
+                1: "a",
+                fn_inner: {
+                    None: "deep",
+                    "nested_str": "value",
+                    2.5: {fn_inner: "deepest"},
+                },
+                "b": {3: "three", 4: "four"},
+            },
+        }
+
+    r1 = serialize_template_field(make_value(), "op_kwargs")
+    r2 = serialize_template_field(make_value(), "op_kwargs")
+
+    assert r1 == r2
+    assert all(isinstance(k, str) for k in r1["level1"])
+    callable_key = next(k for k in r1["level1"] if "fn_inner" in k)
+    inner = r1["level1"][callable_key]
+    assert all(isinstance(k, str) for k in inner)
+    float_key = next(k for k in inner if k == "2.5")
+    assert all(isinstance(k, str) for k in inner[float_key])
+    assert "at 0x" not in str(r1)
+    json.dumps(r1, sort_keys=True)
diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py 
b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
index ef797ca558d..32cdae71277 100644
--- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py
+++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
@@ -1003,83 +1003,91 @@ def startup(msg: StartupDetails) -> 
tuple[RuntimeTaskInstance, Context, Logger]:
     return ti, ti.get_template_context(), log
 
 
-def _serialize_template_field(template_field: Any, name: str) -> str | dict | 
list | int | float:
+def _serialize_template_field(
+    template_field: Any, name: str
+) -> str | dict | list | int | float | bool | None:
     """
     Return a serializable representation of the templated field.
 
-    If ``templated_field`` contains a class or instance that requires recursive
-    templating, store them as strings. Otherwise simply return the field as-is.
+    The walk has two responsibilities:
 
-    Used sdk secrets masker to redact secrets in the serialized output.
+    1. **Make the template_field JSON-encodable** — every container is rebuilt
+       with primitive leaves (str/int/float/bool/None), tuples and sets are
+       flattened to lists, and unsupported objects fall through to ``str()``
+       so ``json.dumps`` never raises on the result.
+    2. **Keep the output deterministic across parses** — callables are replaced
+       with their qualified name (never the default ``<function ... at 0x...>``
+       repr), dicts are key-sorted, and (frozen)sets are sorted by element so
+       the same input always produces the same string.
+
+    Uses the SDK secrets masker to redact secrets in the serialized output.
     """
-    import json
+    import inspect
 
+    from airflow.sdk._shared.module_loading import qualname
     from airflow.sdk._shared.secrets_masker import redact
 
-    def is_jsonable(x):
-        try:
-            json.dumps(x)
-        except (TypeError, OverflowError):
-            return False
-        else:
-            return True
-
-    def translate_tuples_to_lists(obj: Any):
-        """Recursively convert tuples to lists."""
-        if isinstance(obj, tuple):
-            return [translate_tuples_to_lists(item) for item in obj]
-        if isinstance(obj, list):
-            return [translate_tuples_to_lists(item) for item in obj]
-        if isinstance(obj, dict):
-            return {key: translate_tuples_to_lists(value) for key, value in 
obj.items()}
-        return obj
+    def normalize_dict_key(key) -> str:
+        """Normalize a dict key to a serialized string type."""
+        # Serialized template_field keys must all be strings, not a mix of 
types, so that
+        # downstream json.dumps(..., sort_keys=True) does not raise on 
mixed-type keys.
+        return str(serialize_object(key))
+
+    def serialize_object(obj):
+        """Recursively rewrite ``obj`` into a JSON-encodable, hash-stable 
structure."""
+        if obj is None or isinstance(obj, (str, int, float, bool)):
+            return obj
 
-    def sort_dict_recursively(obj: Any) -> Any:
-        """Recursively sort dictionaries to ensure consistent ordering."""
         if isinstance(obj, dict):
-            return {k: sort_dict_recursively(v) for k, v in 
sorted(obj.items())}
-        if isinstance(obj, list):
-            return [sort_dict_recursively(item) for item in obj]
-        if isinstance(obj, tuple):
-            return tuple(sort_dict_recursively(item) for item in obj)
-        return obj
-
-    def _fallback_serialization(obj):
-        """Serialize objects with to_dict() method (eg: k8s objects) for 
json.dumps() default parameter."""
-        if hasattr(obj, "to_dict"):
-            return obj.to_dict()
-        raise TypeError(f"cannot serialize {obj}")
+            # Serialize keys/values first so each key is a string and the 
output is hash-stable,
+            # then sort by the serialized key to prevent hash inconsistencies 
when dict ordering varies.
+            serialized_pairs = [(normalize_dict_key(k), serialize_object(v)) 
for k, v in obj.items()]
+            return dict(sorted(serialized_pairs, key=lambda kv: kv[0]))
+
+        if isinstance(obj, (list, tuple)):
+            return [serialize_object(item) for item in obj]
+
+        if isinstance(obj, (set, frozenset)):
+            # JSON has no set type → flatten to a list with deterministic 
ordering
+            # so hash randomization on element types cannot shift 
cross-process iteration order.
+            serialized_set = [serialize_object(e) for e in obj]
+            return sorted(serialized_set, key=lambda x: (type(x).__name__, 
str(x)))
+
+        # Use inspect.getattr_static to bypass any custom __getattr__ / 
metaclass magic
+        if callable(inspect.getattr_static(obj, "serialize", None)):
+            return serialize_object(obj.serialize())
+
+        # Kubernetes client objects (V1Pod, V1Container, ...) expose their 
content via to_dict().
+        # Scope the branch to the kubernetes namespace so unrelated user 
classes that happen to
+        # define a to_dict() method fall through to str() instead of being 
treated as K8s payloads.
+        if getattr(type(obj), "__module__", "").startswith(
+            ("kubernetes.", "kubernetes_asyncio.")
+        ) and callable(inspect.getattr_static(obj, "to_dict", None)):
+            return serialize_object(obj.to_dict())
+
+        if callable(obj):
+            # Use qualified name; default repr embeds memory addresses, which 
would change the DAG hash on every parse
+            return f"<callable {qualname(obj, True)}>"
+
+        # A custom __str__ or __repr__ is treated as an intentional textual 
representation
+        # supplied by the author and used as-is.
+        if type(obj).__str__ is not object.__str__ or type(obj).__repr__ is 
not object.__repr__:
+            return str(obj)
+
+        # Otherwise fall back to a qualname marker. The default object repr is
+        # `<ClassName object at 0x...>`, which embeds a memory address that 
flips per process
+        # and would break DAG hash stability — use the class qualname instead.
+        return f"<{qualname(type(obj), True)} object>"
 
     max_length = conf.getint("core", "max_templated_field_length")
 
-    if not is_jsonable(template_field):
-        try:
-            serialized = template_field.serialize()
-        except AttributeError:
-            # check if these objects can be converted to JSON serializable 
types
-            try:
-                serialized = json.dumps(template_field, 
default=_fallback_serialization)
-            except (TypeError, ValueError):
-                # fall back to string representation if not
-                serialized = str(template_field)
-        if len(serialized) > max_length:
-            rendered = redact(serialized, name)
-            return truncate_rendered_value(str(rendered), max_length)
-        return serialized
-    if not template_field and not isinstance(template_field, tuple):
-        # Avoid unnecessary serialization steps for empty fields unless they 
are tuples
-        # and need to be converted to lists
-        return template_field
-    template_field = translate_tuples_to_lists(template_field)
-    # Sort dictionaries recursively to ensure consistent string representation
-    # This prevents hash inconsistencies when dict ordering varies
-    if isinstance(template_field, dict):
-        template_field = sort_dict_recursively(template_field)
-    serialized = str(template_field)
-    if len(serialized) > max_length:
-        rendered = redact(serialized, name)
+    serialized = serialize_object(template_field)
+
+    if len(str(serialized)) > max_length:
+        rendered = redact(str(serialized), name)
         return truncate_rendered_value(str(rendered), max_length)
-    return template_field
+
+    return serialized
 
 
 def _serialize_rendered_fields(task: AbstractOperator) -> dict[str, JsonValue]:
diff --git a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py 
b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
index 723ca42d93a..b851d73f744 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
@@ -1117,7 +1117,7 @@ def test_basic_templated_dag(mocked_parse, 
make_ti_context, mock_supervisor_comm
         ),
         pytest.param(
             {"my_tup": (1, 2), "my_set": {1, 2, 3}},
-            {"my_tup": [1, 2], "my_set": "{1, 2, 3}"},
+            {"my_tup": [1, 2], "my_set": [1, 2, 3]},
             id="tuples_and_sets",
         ),
         pytest.param(
@@ -3042,10 +3042,13 @@ class TestRuntimeTaskInstance:
 
         rendered_fields = 
mock_supervisor_comms.send.mock_calls[0].kwargs["msg"].rendered_fields
         assert rendered_fields is not None
-        assert (
-            rendered_fields["env_vars"]
-            == '[{"name": "var1", "value": "This is a test phrase.", 
"value_from": null}, {"name": "var2", "value": "***", "value_from": null}, 
{"name": "var3", "value": "***", "value_from": null}]'
-        )
+        # K8s V1EnvVar objects expose .to_dict(); the recursive walk 
normalizes the list of objects
+        # into a list of plain dicts so the result is directly JSON-encodable 
and redact can mask secrets in nested values.
+        assert rendered_fields["env_vars"] == [
+            {"name": "var1", "value": "This is a test phrase.", "value_from": 
None},
+            {"name": "var2", "value": "***", "value_from": None},
+            {"name": "var3", "value": "***", "value_from": None},
+        ]
 
     def test_nested_template_field_renderer_respects_redaction(
         self, create_runtime_ti, mock_supervisor_comms


Reply via email to