This is an automated email from the ASF dual-hosted git repository.

kaxil pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 764c516b01f Fix DagProcessor crash: add missing name_is_otel_safe() 
guard to gauge() and timer() (#68284)
764c516b01f is described below

commit 764c516b01f03ec680242f3001b0e538609f0f56
Author: Salil Agrawal <[email protected]>
AuthorDate: Wed Jun 10 17:13:55 2026 +0530

    Fix DagProcessor crash: add missing name_is_otel_safe() guard to gauge() 
and timer() (#68284)
---
 .../observability/metrics/otel_logger.py           | 25 ++++++++++++++++------
 .../observability/metrics/test_otel_logger.py      | 25 ++++++++++++++++++++++
 2 files changed, 44 insertions(+), 6 deletions(-)

diff --git 
a/shared/observability/src/airflow_shared/observability/metrics/otel_logger.py 
b/shared/observability/src/airflow_shared/observability/metrics/otel_logger.py
index 4bd6f116683..9be7103a62b 100644
--- 
a/shared/observability/src/airflow_shared/observability/metrics/otel_logger.py
+++ 
b/shared/observability/src/airflow_shared/observability/metrics/otel_logger.py
@@ -192,6 +192,18 @@ class SafeOtelLogger:
         self.stat_name_handler = stat_name_handler
         self.statsd_influxdb_enabled = statsd_influxdb_enabled
 
+    def _is_recordable(self, stat: str | None) -> bool:
+        """
+        Return True if ``stat`` may be recorded: non-empty, allowed by the 
validator, and OTel-safe.
+
+        Every recording method must gate on this before emitting; otherwise an 
unsafe name reaches
+        the OTel SDK, which raises and crashes the emitting process. Keeping 
the check in one place
+        stops a new recording method from silently omitting it.
+        """
+        if not stat:
+            return False
+        return self.metrics_validator.test(stat) and 
name_is_otel_safe(self.prefix, stat)
+
     def incr(
         self,
         stat: str,
@@ -213,7 +225,7 @@ class SafeOtelLogger:
         if count < 0:
             raise ValueError("count must be a positive value.")
 
-        if self.metrics_validator.test(stat) and 
name_is_otel_safe(self.prefix, stat):
+        if self._is_recordable(stat):
             counter = 
self.metrics_map.get_counter(full_name(prefix=self.prefix, name=stat), 
attributes=tags)
             counter.add(count, attributes=tags)
             return counter
@@ -239,7 +251,7 @@ class SafeOtelLogger:
         if count < 0:
             raise ValueError("count must be a positive value.")
 
-        if self.metrics_validator.test(stat) and 
name_is_otel_safe(self.prefix, stat):
+        if self._is_recordable(stat):
             counter = 
self.metrics_map.get_counter(full_name(prefix=self.prefix, name=stat))
             counter.add(-count, attributes=tags)
             return counter
@@ -270,12 +282,12 @@ class SafeOtelLogger:
         if _skip_due_to_rate(rate):
             return
 
-        if back_compat_name and self.metrics_validator.test(back_compat_name):
+        if self._is_recordable(back_compat_name):
             self.metrics_map.set_gauge_value(
                 full_name(prefix=self.prefix, name=back_compat_name), value, 
delta, tags
             )
 
-        if self.metrics_validator.test(stat):
+        if self._is_recordable(stat):
             self.metrics_map.set_gauge_value(full_name(prefix=self.prefix, 
name=stat), value, delta, tags)
 
     def timing(
@@ -286,7 +298,7 @@ class SafeOtelLogger:
         tags: Attributes = None,
     ) -> None:
         """Record a timing observation as a Histogram to preserve distribution 
information."""
-        if self.metrics_validator.test(stat) and 
name_is_otel_safe(self.prefix, stat):
+        if self._is_recordable(stat):
             if isinstance(dt, datetime.timedelta):
                 dt = dt.total_seconds() * 1000.0
             
self.metrics_map.record_histogram_value(full_name(prefix=self.prefix, 
name=stat), float(dt), tags)
@@ -299,7 +311,8 @@ class SafeOtelLogger:
         **kwargs,
     ) -> Timer:
         """Timer context manager returns the duration and can be cancelled."""
-        return _OtelTimer(self, stat, tags)
+        safe_stat = stat if self._is_recordable(stat) else None
+        return _OtelTimer(self, safe_stat, tags)
 
 
 class InternalGauge:
diff --git 
a/shared/observability/tests/observability/metrics/test_otel_logger.py 
b/shared/observability/tests/observability/metrics/test_otel_logger.py
index c5312f509bd..2b5c71193c3 100644
--- a/shared/observability/tests/observability/metrics/test_otel_logger.py
+++ b/shared/observability/tests/observability/metrics/test_otel_logger.py
@@ -121,6 +121,31 @@ class TestOtelMetrics:
         assert result is None
         self.meter.get_meter().create_counter.assert_not_called()
 
+    @pytest.mark.parametrize(
+        "stat",
+        [
+            "dag_processing.last_run.seconds_ago.PBI_SKU_Performance copy",  # 
space in filename
+            "dag_processing.last_run.seconds_ago.mein_däg_file",  # non-ASCII 
in filename
+        ],
+    )
+    def test_gauge_with_invalid_stat_names_skipped_without_raising(self, stat):
+        self.stats.gauge(stat, value=1)
+
+        self.meter.get_meter().create_gauge.assert_not_called()
+
+    @pytest.mark.parametrize(
+        "stat",
+        [
+            "dag.my_dag.preço_task.duration",  # non-ASCII
+            "dag.my_dag.task copy.duration",  # space
+        ],
+    )
+    def test_timer_with_invalid_stat_name_does_not_record(self, stat):
+        with self.stats.timer(stat):
+            pass
+
+        self.meter.get_meter().create_histogram.assert_not_called()
+
     def test_old_name_exception_works(self, caplog):
         name = 
"task_instance_created_OperatorNameWhichIsSuperLongAndExceedsTheOpenTelemetryCharacterLimit/task_instance_created_OperatorNameWhichIsSuperLongAndExceedsTheOpenTelemetryCharacterLimit/task_instance_created_OperatorNameWhichIsSuperLongAndExceedsTheOpenTelemetryCharacterLimit"
 

Reply via email to