This is an automated email from the ASF dual-hosted git repository.
vatsrahul1001 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 597891128e9 Fix Callback.handle_event crash on OTel metrics with dict
tag values (#67527)
597891128e9 is described below
commit 597891128e9a9deb82d121f471830914ed5cb049
Author: Rahul Vats <[email protected]>
AuthorDate: Tue May 26 12:09:41 2026 +0530
Fix Callback.handle_event crash on OTel metrics with dict tag values
(#67527)
Fix Callback.handle_event crash on OTel metrics with dict tag values
(#67527)
---
airflow-core/src/airflow/models/callback.py | 14 +++++++++++++
airflow-core/tests/unit/models/test_callback.py | 28 ++++++++++++++++++++++++-
2 files changed, 41 insertions(+), 1 deletion(-)
diff --git a/airflow-core/src/airflow/models/callback.py
b/airflow-core/src/airflow/models/callback.py
index 15f9662cdc8..454057083fb 100644
--- a/airflow-core/src/airflow/models/callback.py
+++ b/airflow-core/src/airflow/models/callback.py
@@ -16,6 +16,7 @@
# under the License.
from __future__ import annotations
+import json
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
@@ -161,6 +162,19 @@ class Callback(Base, BaseWorkload):
# Remove the context (if exists) to keep the tags simple
tags["kwargs"] = {k: v for k, v in tags["kwargs"].items() if k !=
"context"}
+ # Metric backends (statsd, OpenTelemetry) require tag values to be
primitives.
+ # OTel's aggregation key is built via
``frozenset(attributes.items())``, which
+ # raises ``TypeError: unhashable type: 'dict'`` if a value is a
dict/list. The
+ # callback's ``result`` (passed in from a user callback) and
``kwargs`` are both
+ # frequently dicts, so coerce any non-primitive tag value to a JSON
string before
+ # returning. Using ``default=str`` so values like ``datetime`` fall
back cleanly.
+ tags = {
+ k: v
+ if isinstance(v, (str, int, float, bool)) or v is None
+ else json.dumps(v, default=str, sort_keys=True)
+ for k, v in tags.items()
+ }
+
prefix = self.data.get("prefix", "")
name = f"{prefix}.callback_{status}" if prefix else
f"callback_{status}"
diff --git a/airflow-core/tests/unit/models/test_callback.py
b/airflow-core/tests/unit/models/test_callback.py
index bb3102decf8..5903f541854 100644
--- a/airflow-core/tests/unit/models/test_callback.py
+++ b/airflow-core/tests/unit/models/test_callback.py
@@ -112,10 +112,36 @@ class TestCallback:
assert metric_info["tags"] == {
"result": "0",
"path": TEST_ASYNC_CALLBACK.path,
- "kwargs": {"email": "[email protected]"},
+ "kwargs": '{"email": "[email protected]"}',
"dag_id": TEST_DAG_ID,
}
+ def test_get_metric_info_dict_values_are_stringified(self):
+ """
+ Regression for ``TypeError: unhashable type: 'dict'`` raised by
OpenTelemetry's
+ ``_view_instrument_match`` when callback metric tags contain dict/list
values.
+
+ OTel builds its aggregation key as ``frozenset(attributes.items())``;
any tag
+ value that isn't hashable (dict, list, set) crashes the triggerer when
a
+ callback completes — e.g., deadline async callbacks whose ``result``
is a dict.
+ """
+ callback = TriggererCallback(TEST_ASYNC_CALLBACK,
prefix="deadline_alerts", dag_id=TEST_DAG_ID)
+ callback.data["kwargs"] = {"context": {"dag_id": TEST_DAG_ID},
"nested": {"a": 1}}
+
+ # ``result`` is a dict — exactly the case that surfaced in the
deadline DAG.
+ metric_info = callback.get_metric_info(CallbackState.SUCCESS,
{"output": [1, 2], "code": 0})
+
+ # Every tag value must be a primitive (str/int/float/bool/None) so
OTel can hash it.
+ for k, v in metric_info["tags"].items():
+ assert isinstance(v, (str, int, float, bool)) or v is None, (
+ f"Tag {k!r}={v!r} is type {type(v).__name__}; must be
primitive for OTel."
+ )
+ # ``frozenset(attributes.items())`` must not raise.
+ frozenset(metric_info["tags"].items())
+ # Stringified tag values must be sorted so equivalent kwargs in
different
+ # insertion order collapse to one metric series (no needless
cardinality split).
+ assert metric_info["tags"]["result"] == '{"code": 0, "output": [1, 2]}'
+
class TestTriggererCallback:
def test_polymorphic_serde(self, session):