This is an automated email from the ASF dual-hosted git repository.
kaxil pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 764c516b01f Fix DagProcessor crash: add missing name_is_otel_safe()
guard to gauge() and timer() (#68284)
764c516b01f is described below
commit 764c516b01f03ec680242f3001b0e538609f0f56
Author: Salil Agrawal <[email protected]>
AuthorDate: Wed Jun 10 17:13:55 2026 +0530
Fix DagProcessor crash: add missing name_is_otel_safe() guard to gauge()
and timer() (#68284)
---
.../observability/metrics/otel_logger.py | 25 ++++++++++++++++------
.../observability/metrics/test_otel_logger.py | 25 ++++++++++++++++++++++
2 files changed, 44 insertions(+), 6 deletions(-)
diff --git
a/shared/observability/src/airflow_shared/observability/metrics/otel_logger.py
b/shared/observability/src/airflow_shared/observability/metrics/otel_logger.py
index 4bd6f116683..9be7103a62b 100644
---
a/shared/observability/src/airflow_shared/observability/metrics/otel_logger.py
+++
b/shared/observability/src/airflow_shared/observability/metrics/otel_logger.py
@@ -192,6 +192,18 @@ class SafeOtelLogger:
self.stat_name_handler = stat_name_handler
self.statsd_influxdb_enabled = statsd_influxdb_enabled
+ def _is_recordable(self, stat: str | None) -> bool:
+ """
+ Return True if ``stat`` may be recorded: non-empty, allowed by the
validator, and OTel-safe.
+
+ Every recording method must gate on this before emitting; otherwise an
unsafe name reaches
+ the OTel SDK, which raises and crashes the emitting process. Keeping
the check in one place
+ stops a new recording method from silently omitting it.
+ """
+ if not stat:
+ return False
+ return self.metrics_validator.test(stat) and
name_is_otel_safe(self.prefix, stat)
+
def incr(
self,
stat: str,
@@ -213,7 +225,7 @@ class SafeOtelLogger:
if count < 0:
raise ValueError("count must be a positive value.")
- if self.metrics_validator.test(stat) and
name_is_otel_safe(self.prefix, stat):
+ if self._is_recordable(stat):
counter =
self.metrics_map.get_counter(full_name(prefix=self.prefix, name=stat),
attributes=tags)
counter.add(count, attributes=tags)
return counter
@@ -239,7 +251,7 @@ class SafeOtelLogger:
if count < 0:
raise ValueError("count must be a positive value.")
- if self.metrics_validator.test(stat) and
name_is_otel_safe(self.prefix, stat):
+ if self._is_recordable(stat):
counter =
self.metrics_map.get_counter(full_name(prefix=self.prefix, name=stat))
counter.add(-count, attributes=tags)
return counter
@@ -270,12 +282,12 @@ class SafeOtelLogger:
if _skip_due_to_rate(rate):
return
- if back_compat_name and self.metrics_validator.test(back_compat_name):
+ if self._is_recordable(back_compat_name):
self.metrics_map.set_gauge_value(
full_name(prefix=self.prefix, name=back_compat_name), value,
delta, tags
)
- if self.metrics_validator.test(stat):
+ if self._is_recordable(stat):
self.metrics_map.set_gauge_value(full_name(prefix=self.prefix,
name=stat), value, delta, tags)
def timing(
@@ -286,7 +298,7 @@ class SafeOtelLogger:
tags: Attributes = None,
) -> None:
"""Record a timing observation as a Histogram to preserve distribution
information."""
- if self.metrics_validator.test(stat) and
name_is_otel_safe(self.prefix, stat):
+ if self._is_recordable(stat):
if isinstance(dt, datetime.timedelta):
dt = dt.total_seconds() * 1000.0
self.metrics_map.record_histogram_value(full_name(prefix=self.prefix,
name=stat), float(dt), tags)
@@ -299,7 +311,8 @@ class SafeOtelLogger:
**kwargs,
) -> Timer:
"""Timer context manager returns the duration and can be cancelled."""
- return _OtelTimer(self, stat, tags)
+ safe_stat = stat if self._is_recordable(stat) else None
+ return _OtelTimer(self, safe_stat, tags)
class InternalGauge:
diff --git
a/shared/observability/tests/observability/metrics/test_otel_logger.py
b/shared/observability/tests/observability/metrics/test_otel_logger.py
index c5312f509bd..2b5c71193c3 100644
--- a/shared/observability/tests/observability/metrics/test_otel_logger.py
+++ b/shared/observability/tests/observability/metrics/test_otel_logger.py
@@ -121,6 +121,31 @@ class TestOtelMetrics:
assert result is None
self.meter.get_meter().create_counter.assert_not_called()
+ @pytest.mark.parametrize(
+ "stat",
+ [
+ "dag_processing.last_run.seconds_ago.PBI_SKU_Performance copy", #
space in filename
+ "dag_processing.last_run.seconds_ago.mein_däg_file", # non-ASCII
in filename
+ ],
+ )
+ def test_gauge_with_invalid_stat_names_skipped_without_raising(self, stat):
+ self.stats.gauge(stat, value=1)
+
+ self.meter.get_meter().create_gauge.assert_not_called()
+
+ @pytest.mark.parametrize(
+ "stat",
+ [
+ "dag.my_dag.preço_task.duration", # non-ASCII
+ "dag.my_dag.task copy.duration", # space
+ ],
+ )
+ def test_timer_with_invalid_stat_name_does_not_record(self, stat):
+ with self.stats.timer(stat):
+ pass
+
+ self.meter.get_meter().create_histogram.assert_not_called()
+
def test_old_name_exception_works(self, caplog):
name =
"task_instance_created_OperatorNameWhichIsSuperLongAndExceedsTheOpenTelemetryCharacterLimit/task_instance_created_OperatorNameWhichIsSuperLongAndExceedsTheOpenTelemetryCharacterLimit/task_instance_created_OperatorNameWhichIsSuperLongAndExceedsTheOpenTelemetryCharacterLimit"