This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch v3-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-2-test by this push:
new d4217f4038f Fix OTel timer metrics using Gauge instead of Histogram
(#64207) (#66865)
d4217f4038f is described below
commit d4217f4038f63420cb27533336c94b22077afd6c
Author: Rahul Vats <[email protected]>
AuthorDate: Thu May 14 00:20:58 2026 +0530
Fix OTel timer metrics using Gauge instead of Histogram (#64207) (#66865)
* Fix OTel timer metrics using Gauge instead of Histogram
* Use ExponentialBucketHistogramAggregation for timing metrics
* Use public API import path for ExponentialBucketHistogramAggregation and
fix histogram map isolation
(cherry picked from commit b2dadd2b7623d0d99f6fea0521bf008b7b957cac)
Co-authored-by: namratachaudhary <[email protected]>
---
airflow-core/newsfragments/64207.significant.rst | 1 +
.../observability/metrics/otel_logger.py | 58 +++++++++++++++++++---
.../observability/metrics/test_otel_logger.py | 47 ++++++++++++------
3 files changed, 86 insertions(+), 20 deletions(-)
diff --git a/airflow-core/newsfragments/64207.significant.rst
b/airflow-core/newsfragments/64207.significant.rst
new file mode 100644
index 00000000000..3254fa20a54
--- /dev/null
+++ b/airflow-core/newsfragments/64207.significant.rst
@@ -0,0 +1 @@
+OTel timer and timing metrics now use Histogram instead of Gauge, preserving
count, sum, and bucket distribution across recordings.
diff --git
a/shared/observability/src/airflow_shared/observability/metrics/otel_logger.py
b/shared/observability/src/airflow_shared/observability/metrics/otel_logger.py
index c8db3ee02f9..b7b8effe8dc 100644
---
a/shared/observability/src/airflow_shared/observability/metrics/otel_logger.py
+++
b/shared/observability/src/airflow_shared/observability/metrics/otel_logger.py
@@ -30,6 +30,7 @@ from opentelemetry.sdk.metrics._internal.export import (
ConsoleMetricExporter,
PeriodicExportingMetricReader,
)
+from opentelemetry.sdk.metrics.view import
ExponentialBucketHistogramAggregation, View
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
from ..common import get_otel_data_exporter
@@ -146,7 +147,8 @@ class _OtelTimer(Timer):
"""
An implementation of Stats.Timer() which records the result in the OTel
Metrics Map.
- OpenTelemetry does not have a native timer, we will store the values as a
Gauge.
+ OpenTelemetry does not have a native timer; values are stored as a
Histogram so that
+ all observations (count, sum, bucket distribution) are preserved across
multiple recordings.
:param name: The name of the timer.
:param tags: Tags to append to the timer.
@@ -160,9 +162,9 @@ class _OtelTimer(Timer):
def stop(self, send: bool = True) -> None:
super().stop(send)
- if self.name and send and self.duration:
- self.otel_logger.metrics_map.set_gauge_value(
- full_name(prefix=self.otel_logger.prefix, name=self.name),
self.duration, False, self.tags
+ if self.name and send and self.duration is not None:
+ self.otel_logger.metrics_map.record_histogram_value(
+ full_name(prefix=self.otel_logger.prefix, name=self.name),
self.duration, self.tags
)
@@ -278,11 +280,11 @@ class SafeOtelLogger:
*,
tags: Attributes = None,
) -> None:
- """OTel does not have a native timer, stored as a Gauge whose value is
elapsed ms."""
+ """Record a timing observation as a Histogram to preserve distribution
information."""
if self.metrics_validator.test(stat) and
name_is_otel_safe(self.prefix, stat):
if isinstance(dt, datetime.timedelta):
dt = dt.total_seconds() * 1000.0
- self.metrics_map.set_gauge_value(full_name(prefix=self.prefix,
name=stat), float(dt), False, tags)
+
self.metrics_map.record_histogram_value(full_name(prefix=self.prefix,
name=stat), float(dt), tags)
def timer(
self,
@@ -314,15 +316,29 @@ class InternalGauge:
self.gauge.set(new_value, attributes=self.attributes)
+class InternalHistogram:
+ """Stores a histogram instrument for timer/timing metrics."""
+
+ def __init__(self, meter, name: str):
+ otel_safe_name = _get_otel_safe_name(name)
+ self.histogram = meter.create_histogram(name=otel_safe_name, unit="ms")
+ log.debug("Created %s as type: %s", otel_safe_name,
_type_as_str(self.histogram))
+
+ def record(self, value: float, tags: Attributes) -> None:
+ self.histogram.record(value, attributes=tags)
+
+
class MetricsMap:
"""Stores Otel Instruments."""
def __init__(self, meter):
self.meter = meter
self.map = {}
+ self.histograms: dict[str, InternalHistogram] = {}
def clear(self) -> None:
self.map.clear()
+ self.histograms.clear()
def _create_counter(self, name):
"""Create a new counter or up_down_counter for the provided name."""
@@ -376,6 +392,21 @@ class MetricsMap:
self.map[key].set_value(value, delta)
+ def record_histogram_value(self, name: str, value: float, tags:
Attributes) -> None:
+ """
+ Record a timing observation in a Histogram instrument.
+
+ Unlike a Gauge, a Histogram accumulates all observations so that
count, sum,
+ and bucket distribution are preserved across multiple recordings.
+
+ :param name: The name of the histogram to record.
+ :param value: The timing observation in milliseconds.
+ :param tags: Attributes to attach to the observation.
+ """
+ if name not in self.histograms:
+ self.histograms[name] = InternalHistogram(meter=self.meter,
name=name)
+ self.histograms[name].record(value, tags)
+
def flush_otel_metrics():
provider = metrics.get_meter_provider()
@@ -400,6 +431,15 @@ def get_otel_logger(
stat_name_handler: Callable[[str], str] | None = None,
statsd_influxdb_enabled: bool = False,
) -> SafeOtelLogger:
+ """
+ Build and return a :class:`SafeOtelLogger` backed by a configured
:class:`MeterProvider`.
+
+ Histogram instruments (used for ``timing()`` / ``timer()`` metrics) are
aggregated with
+
:class:`~opentelemetry.sdk.metrics.view.ExponentialBucketHistogramAggregation`
+ so that bucket boundaries adapt automatically to the observed data range.
This avoids
+ the need to hand-tune explicit bucket boundaries for metrics that span
very different
+ scales (milliseconds to hours).
+ """
otel_env_config = load_metrics_env_config()
effective_service_name: str = otel_env_config.service_name or service_name
or "airflow"
@@ -453,6 +493,12 @@ def get_otel_logger(
MeterProvider(
resource=resource,
metric_readers=readers,
+ views=[
+ View(
+ instrument_type=metrics.Histogram,
+ aggregation=ExponentialBucketHistogramAggregation(),
+ )
+ ],
shutdown_on_exit=False,
),
)
diff --git
a/shared/observability/tests/observability/metrics/test_otel_logger.py
b/shared/observability/tests/observability/metrics/test_otel_logger.py
index f7b348354d7..3c1369ec44b 100644
--- a/shared/observability/tests/observability/metrics/test_otel_logger.py
+++ b/shared/observability/tests/observability/metrics/test_otel_logger.py
@@ -25,6 +25,7 @@ from unittest import mock
import pytest
from opentelemetry.metrics import MeterProvider
+from opentelemetry.sdk.metrics.view import
ExponentialBucketHistogramAggregation, View
from airflow_shared.observability.common import get_otel_data_exporter
from airflow_shared.observability.exceptions import InvalidStatsNameException
@@ -244,25 +245,28 @@ class TestOtelMetrics:
self.stats.timing(name, dt=datetime.timedelta(seconds=123))
-
self.meter.get_meter().create_gauge.assert_called_once_with(name=full_name(name))
- expected_value = 123000.0
- assert self.map[full_name(name)].value == expected_value
+
self.meter.get_meter().create_histogram.assert_called_once_with(name=full_name(name),
unit="ms")
+
self.meter.get_meter().create_histogram.return_value.record.assert_called_once_with(
+ 123000.0, attributes=None
+ )
def test_timing_new_metric_with_tags(self, name):
tags = {"hello": "world"}
- key = _generate_key_name(full_name(name), tags)
self.stats.timing(name, dt=1, tags=tags)
-
self.meter.get_meter().create_gauge.assert_called_once_with(name=full_name(name))
- self.map[key].attributes == tags
+
self.meter.get_meter().create_histogram.assert_called_once_with(name=full_name(name),
unit="ms")
+
self.meter.get_meter().create_histogram.return_value.record.assert_called_once_with(
+ 1.0, attributes=tags
+ )
def test_timing_existing_metric(self, name):
self.stats.timing(name, dt=1)
self.stats.timing(name, dt=2)
-
self.meter.get_meter().create_gauge.assert_called_once_with(name=full_name(name))
- assert self.map[full_name(name)].value == 2
+ # histogram created only once, but both observations are recorded
+
self.meter.get_meter().create_histogram.assert_called_once_with(name=full_name(name),
unit="ms")
+ assert
self.meter.get_meter().create_histogram.return_value.record.call_count == 2
# For the four test_timer_foo tests below:
# time.perf_count() is called once to get the starting timestamp and
again
@@ -277,7 +281,7 @@ class TestOtelMetrics:
expected_duration = 3140.0
assert timer.duration == expected_duration
assert mock_time.call_count == 2
-
self.meter.get_meter().create_gauge.assert_called_once_with(name=full_name(name))
+
self.meter.get_meter().create_histogram.assert_called_once_with(name=full_name(name),
unit="ms")
@mock.patch.object(time, "perf_counter", side_effect=[0.0, 3.14])
def test_timer_no_name_returns_float_but_does_not_store_value(self,
mock_time, name):
@@ -288,7 +292,7 @@ class TestOtelMetrics:
expected_duration = 3140.0
assert timer.duration == expected_duration
assert mock_time.call_count == 2
- self.meter.get_meter().create_gauge.assert_not_called()
+ self.meter.get_meter().create_histogram.assert_not_called()
@mock.patch.object(time, "perf_counter", side_effect=[0.0, 3.14])
def test_timer_start_and_stop_manually_send_false(self, mock_time, name):
@@ -301,7 +305,7 @@ class TestOtelMetrics:
expected_value = 3140.0
assert timer.duration == expected_value
assert mock_time.call_count == 2
- self.meter.get_meter().create_gauge.assert_not_called()
+ self.meter.get_meter().create_histogram.assert_not_called()
@mock.patch.object(time, "perf_counter", side_effect=[0.0, 3.14])
def test_timer_start_and_stop_manually_send_true(self, mock_time, name):
@@ -314,7 +318,7 @@ class TestOtelMetrics:
expected_value = 3140.0
assert timer.duration == expected_value
assert mock_time.call_count == 2
-
self.meter.get_meter().create_gauge.assert_called_once_with(name=full_name(name))
+
self.meter.get_meter().create_histogram.assert_called_once_with(name=full_name(name),
unit="ms")
@pytest.mark.parametrize(
(
@@ -415,6 +419,18 @@ class TestOtelMetrics:
==
f"opentelemetry.exporter.otlp.proto.{expected_exporter_module}.metric_exporter"
)
+ @mock.patch("airflow_shared.observability.metrics.otel_logger.metrics")
+
@mock.patch("airflow_shared.observability.metrics.otel_logger.MeterProvider")
+ def test_get_otel_logger_uses_exponential_histogram_view(self,
mock_provider, mock_metrics):
+ get_otel_logger(host="localhost", port=4318)
+
+ call_kwargs = mock_provider.call_args.kwargs
+ views = call_kwargs["views"]
+ assert len(views) == 1
+ view = views[0]
+ assert isinstance(view, View)
+ assert isinstance(view._aggregation,
ExponentialBucketHistogramAggregation)
+
def test_atexit_flush_on_process_exit(self):
"""
Run a process that initializes a logger, creates a stat and then exits.
@@ -422,8 +438,11 @@ class TestOtelMetrics:
The logger initialization registers an atexit hook.
Test that the hook runs and flushes the created stat at shutdown.
"""
- test_module_name = "tests.observability.metrics.test_otel_logger"
- function_call_str = f"import {test_module_name} as m;
m.mock_service_run()"
+ function_call_str = (
+ "from airflow_shared.observability.metrics.otel_logger import
get_otel_logger; "
+ "logger = get_otel_logger(debug=True); "
+ "logger.incr('my_test_stat')"
+ )
proc = subprocess.run(
[sys.executable, "-c", function_call_str],