This is an automated email from the ASF dual-hosted git repository. vatsrahul1001 pushed a commit to branch fix-callback-otel-unhashable-dict in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 6097e9dd405fef163eae4d1717e544f24b5a5d50 Author: vatsrahul1001 <[email protected]> AuthorDate: Tue May 26 10:55:51 2026 +0530 Fix Callback.handle_event crash on OTel metrics with dict tag values The triggerer crashes on the next deadline async callback when OpenTelemetry metrics are enabled: File ".../airflow/jobs/triggerer_job_runner.py", line 659, in handle_events Trigger.submit_event(...) File ".../airflow/models/callback.py", line 234, in handle_event Stats.incr(**self.get_metric_info(status, self.output)) File ".../airflow/_shared/observability/metrics/otel_logger.py", line 211, in incr counter.add(count, attributes=tags) File ".../opentelemetry/sdk/metrics/.../view_instrument_match.py", line 105 aggr_key = frozenset(attributes.items()) TypeError: unhashable type: 'dict' `Callback.get_metric_info` builds the metric tags dict directly from the callback's `result` and `self.data` (which includes `kwargs`). Both are frequently dicts — for deadline async callbacks the `result` is the user callback's return value, and `kwargs` is the captured callback kwargs. When the metrics backend is OTel, the SDK builds the aggregation key as `frozenset(attributes.items())`, which raises if any value is unhashable (dict, list, set). The result is a triggerer crash and stalled triggers. The bug is metrics-backend-dependent: statsd accepts non-primitive tag values without complaint, so OSS users running default statsd never see it. OTel backends (used in production by Astronomer Astro Cloud and any OSS deployment that enables `[metrics] otel_*`) hit it consistently. Reproduces against 3.2.1 and main; not a 3.2.x regression. Sanitize tag values to primitives before returning from `get_metric_info`: keep `str | int | float | bool | None` as-is, JSON-stringify anything else. Using `default=str` in `json.dumps` so values like `datetime` fall back cleanly instead of raising. Adds a regression test that asserts every tag value is hashable and that `frozenset(tags.items())` does not raise. Reported by Astronomer Runtime team while testing 3.2.2rc2-based images. --- airflow-core/src/airflow/models/callback.py | 12 +++++++++++ airflow-core/tests/unit/models/test_callback.py | 27 ++++++++++++++++++++++++- 2 files changed, 38 insertions(+), 1 deletion(-) diff --git a/airflow-core/src/airflow/models/callback.py b/airflow-core/src/airflow/models/callback.py index 15f9662cdc8..d9b58690e30 100644 --- a/airflow-core/src/airflow/models/callback.py +++ b/airflow-core/src/airflow/models/callback.py @@ -16,6 +16,7 @@ # under the License. from __future__ import annotations +import json from dataclasses import dataclass from datetime import datetime from enum import Enum @@ -161,6 +162,17 @@ class Callback(Base, BaseWorkload): # Remove the context (if exists) to keep the tags simple tags["kwargs"] = {k: v for k, v in tags["kwargs"].items() if k != "context"} + # Metric backends (statsd, OpenTelemetry) require tag values to be primitives. + # OTel's aggregation key is built via ``frozenset(attributes.items())``, which + # raises ``TypeError: unhashable type: 'dict'`` if a value is a dict/list. The + # callback's ``result`` (passed in from a user callback) and ``kwargs`` are both + # frequently dicts, so coerce any non-primitive tag value to a JSON string before + # returning. Using ``default=str`` so values like ``datetime`` fall back cleanly. + tags = { + k: v if isinstance(v, (str, int, float, bool)) or v is None else json.dumps(v, default=str) + for k, v in tags.items() + } + prefix = self.data.get("prefix", "") name = f"{prefix}.callback_{status}" if prefix else f"callback_{status}" diff --git a/airflow-core/tests/unit/models/test_callback.py b/airflow-core/tests/unit/models/test_callback.py index bb3102decf8..fbfdd233aaa 100644 --- a/airflow-core/tests/unit/models/test_callback.py +++ b/airflow-core/tests/unit/models/test_callback.py @@ -109,13 +109,38 @@ class TestCallback: metric_info = callback.get_metric_info(CallbackState.SUCCESS, "0") assert metric_info["stat"] == "deadline_alerts.callback_success" + # kwargs is JSON-stringified so non-primitive tag values don't crash OTel + # (see test_get_metric_info_dict_values_are_stringified for the regression). assert metric_info["tags"] == { "result": "0", "path": TEST_ASYNC_CALLBACK.path, - "kwargs": {"email": "[email protected]"}, + "kwargs": '{"email": "[email protected]"}', "dag_id": TEST_DAG_ID, } + def test_get_metric_info_dict_values_are_stringified(self): + """ + Regression for ``TypeError: unhashable type: 'dict'`` raised by OpenTelemetry's + ``_view_instrument_match`` when callback metric tags contain dict/list values. + + OTel builds its aggregation key as ``frozenset(attributes.items())``; any tag + value that isn't hashable (dict, list, set) crashes the triggerer when a + callback completes — e.g., deadline async callbacks whose ``result`` is a dict. + """ + callback = TriggererCallback(TEST_ASYNC_CALLBACK, prefix="deadline_alerts", dag_id=TEST_DAG_ID) + callback.data["kwargs"] = {"context": {"dag_id": TEST_DAG_ID}, "nested": {"a": 1}} + + # ``result`` is a dict — exactly the case that surfaced in the deadline DAG. + metric_info = callback.get_metric_info(CallbackState.SUCCESS, {"output": [1, 2], "code": 0}) + + # Every tag value must be a primitive (str/int/float/bool/None) so OTel can hash it. + for k, v in metric_info["tags"].items(): + assert isinstance(v, (str, int, float, bool)) or v is None, ( + f"Tag {k!r}={v!r} is type {type(v).__name__}; must be primitive for OTel." + ) + # ``frozenset(attributes.items())`` must not raise. + frozenset(metric_info["tags"].items()) + class TestTriggererCallback: def test_polymorphic_serde(self, session):
