This is an automated email from the ASF dual-hosted git repository. vatsrahul1001 pushed a commit to branch backport-67527-to-v3-2-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit aa45adb0636ab999a3ee5230ee1731df86a3e961 Author: Rahul Vats <[email protected]> AuthorDate: Tue May 26 12:09:41 2026 +0530 Fix Callback.handle_event crash on OTel metrics with dict tag values (#67527) Fix Callback.handle_event crash on OTel metrics with dict tag values (#67527) (cherry picked from commit 597891128e9a9deb82d121f471830914ed5cb049) --- airflow-core/src/airflow/models/callback.py | 14 +++++++++++++ airflow-core/tests/unit/models/test_callback.py | 28 ++++++++++++++++++++++++- 2 files changed, 41 insertions(+), 1 deletion(-) diff --git a/airflow-core/src/airflow/models/callback.py b/airflow-core/src/airflow/models/callback.py index e08d58fa4da..c854b26cc4f 100644 --- a/airflow-core/src/airflow/models/callback.py +++ b/airflow-core/src/airflow/models/callback.py @@ -17,6 +17,7 @@ from __future__ import annotations import inspect +import json from collections.abc import Callable from datetime import datetime from enum import Enum @@ -161,6 +162,19 @@ class Callback(Base, BaseWorkload): # Remove the context (if exists) to keep the tags simple tags["kwargs"] = {k: v for k, v in tags["kwargs"].items() if k != "context"} + # Metric backends (statsd, OpenTelemetry) require tag values to be primitives. + # OTel's aggregation key is built via ``frozenset(attributes.items())``, which + # raises ``TypeError: unhashable type: 'dict'`` if a value is a dict/list. The + # callback's ``result`` (passed in from a user callback) and ``kwargs`` are both + # frequently dicts, so coerce any non-primitive tag value to a JSON string before + # returning. Using ``default=str`` so values like ``datetime`` fall back cleanly. + tags = { + k: v + if isinstance(v, (str, int, float, bool)) or v is None + else json.dumps(v, default=str, sort_keys=True) + for k, v in tags.items() + } + prefix = self.data.get("prefix", "") name = f"{prefix}.callback_{status}" if prefix else f"callback_{status}" diff --git a/airflow-core/tests/unit/models/test_callback.py b/airflow-core/tests/unit/models/test_callback.py index 5d4940b77ad..dbb8680b5a7 100644 --- a/airflow-core/tests/unit/models/test_callback.py +++ b/airflow-core/tests/unit/models/test_callback.py @@ -112,10 +112,36 @@ class TestCallback: assert metric_info["tags"] == { "result": "0", "path": TEST_ASYNC_CALLBACK.path, - "kwargs": {"email": "[email protected]"}, + "kwargs": '{"email": "[email protected]"}', "dag_id": TEST_DAG_ID, } + def test_get_metric_info_dict_values_are_stringified(self): + """ + Regression for ``TypeError: unhashable type: 'dict'`` raised by OpenTelemetry's + ``_view_instrument_match`` when callback metric tags contain dict/list values. + + OTel builds its aggregation key as ``frozenset(attributes.items())``; any tag + value that isn't hashable (dict, list, set) crashes the triggerer when a + callback completes — e.g., deadline async callbacks whose ``result`` is a dict. + """ + callback = TriggererCallback(TEST_ASYNC_CALLBACK, prefix="deadline_alerts", dag_id=TEST_DAG_ID) + callback.data["kwargs"] = {"context": {"dag_id": TEST_DAG_ID}, "nested": {"a": 1}} + + # ``result`` is a dict — exactly the case that surfaced in the deadline DAG. + metric_info = callback.get_metric_info(CallbackState.SUCCESS, {"output": [1, 2], "code": 0}) + + # Every tag value must be a primitive (str/int/float/bool/None) so OTel can hash it. + for k, v in metric_info["tags"].items(): + assert isinstance(v, (str, int, float, bool)) or v is None, ( + f"Tag {k!r}={v!r} is type {type(v).__name__}; must be primitive for OTel." + ) + # ``frozenset(attributes.items())`` must not raise. + frozenset(metric_info["tags"].items()) + # Stringified tag values must be sorted so equivalent kwargs in different + # insertion order collapse to one metric series (no needless cardinality split). + assert metric_info["tags"]["result"] == '{"code": 0, "output": [1, 2]}' + class TestTriggererCallback: def test_polymorphic_serde(self, session):
