ephraimbuddy commented on code in PR #63871:
URL: https://github.com/apache/airflow/pull/63871#discussion_r3233165043
##########
airflow-core/src/airflow/serialization/helpers.py:
##########
@@ -19,87 +19,88 @@
from __future__ import annotations
import contextlib
+import inspect
from typing import TYPE_CHECKING, Any
from airflow._shared.module_loading import qualname
from airflow._shared.secrets_masker import redact
from airflow._shared.template_rendering import truncate_rendered_value
from airflow.configuration import conf
-from airflow.settings import json
if TYPE_CHECKING:
from airflow.partition_mappers.base import PartitionMapper
from airflow.timetables.base import Timetable as CoreTimetable
-def serialize_template_field(template_field: Any, name: str) -> str | dict |
list | int | float:
+def serialize_template_field(template_field: Any, name: str) -> str | dict |
list | int | float | bool | None:
"""
Return a serializable representation of the templated field.
- If ``templated_field`` is provided via a callable then
- return the following serialized value: ``<callable full_qualified_name>``
+ The walk has two responsibilities:
- If ``templated_field`` contains a class or instance that requires recursive
- templating, store them as strings. Otherwise simply return the field as-is.
+ 1. **Make the template_field JSON-encodable** — every container is rebuilt
+ with primitive leaves (str/int/float/bool/None), tuples and sets are
+ flattened to lists, and unsupported objects fall through to ``str()``
+ so ``json.dumps`` never raises on the result.
+ 2. **Keep the output deterministic across parses** — callables are replaced
+ with their qualified name (never the default ``<function ... at 0x...>``
+ repr), dicts are key-sorted, and (frozen)sets are sorted by element so
+ the same input always produces the same string.
"""
- def is_jsonable(x):
- try:
- json.dumps(x)
- except (TypeError, OverflowError):
- return False
- else:
- return True
-
- def translate_tuples_to_lists(obj: Any):
- """Recursively convert tuples to lists."""
- if isinstance(obj, tuple):
- return [translate_tuples_to_lists(item) for item in obj]
- if isinstance(obj, list):
- return [translate_tuples_to_lists(item) for item in obj]
- if isinstance(obj, dict):
- return {key: translate_tuples_to_lists(value) for key, value in
obj.items()}
- return obj
+ def normalize_dict_key(key) -> str:
+ """Normalize a dict key to a serialized string type."""
+ # Serialized template_field keys must all be strings, not a mix of
types, so that
+ # downstream json.dumps(..., sort_keys=True) does not raise on
mixed-type keys.
+ return str(serialize_object(key))
+
+ def serialize_object(obj):
+ """Recursively rewrite ``obj`` into a JSON-encodable, hash-stable
structure."""
+ if obj is None or isinstance(obj, (str, int, float, bool)):
+ return obj
- def sort_dict_recursively(obj: Any) -> Any:
- """Recursively sort dictionaries to ensure consistent ordering."""
if isinstance(obj, dict):
- return {k: sort_dict_recursively(v) for k, v in
sorted(obj.items())}
- if isinstance(obj, list):
- return [sort_dict_recursively(item) for item in obj]
- if isinstance(obj, tuple):
- return tuple(sort_dict_recursively(item) for item in obj)
- return obj
+ # Serialize keys/values first so each key is a string and the
output is hash-stable,
+ # then sort by the serialized key to prevents hash inconsistencies
when dict ordering varies.
+ serialized_pairs = [(normalize_dict_key(k), serialize_object(v))
for k, v in obj.items()]
+ return dict(sorted(serialized_pairs, key=lambda kv: kv[0]))
+
+ if isinstance(obj, (list, tuple)):
+ return [serialize_object(item) for item in obj]
+
+ if isinstance(obj, (set, frozenset)):
+ # JSON has no set type → flatten to a list with deterministic
ordering
+ # so hash randomization on element types cannot shift
cross-process iteration order.
+ return sorted(
+ (serialize_object(item) for item in obj),
+ key=lambda x: (type(x).__name__, str(x)),
+ )
+
+ # Use inspect.getattr_static to bypass any custom __getattr__ /
metaclass magic
+ if callable(inspect.getattr_static(obj, "serialize", None)):
+ return serialize_object(obj.serialize())
+
+ # Kubernetes client objects (V1Pod, V1Container, ...) expose their
content via to_dict()
+ if callable(inspect.getattr_static(obj, "to_dict", None)):
+ return serialize_object(obj.to_dict())
Review Comment:
This looks too broad. Should we make it specifically for Kubernetes? For
example `inspect.getmodule(obj).__name__.startswith("kubernetes.")`? Or move
this to after `callable(obj)` below?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]