This is an automated email from the ASF dual-hosted git repository. vatsrahul1001 pushed a commit to branch v3-2-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 40a70419afbf7794789f5fa653e48f28ea2ba69b Author: Yeonguk Choo <[email protected]> AuthorDate: Fri May 22 00:44:27 2026 +0900 [v3-2-test] Prevent AlreadyRunningBackfill error caused by invalid date range request (#66874) (#67250) * Fix OTel timer metrics using Gauge instead of Histogram (#64207) (#66865) * Fix OTel timer metrics using Gauge instead of Histogram * Use ExponentialBucketHistogramAggregation for timing metrics * Use public API import path for ExponentialBucketHistogramAggregation and fix histogram map isolation (cherry picked from commit b2dadd2b7623d0d99f6fea0521bf008b7b957cac) Co-authored-by: namratachaudhary <[email protected]> * [v3-2-test] Prevent AlreadyRunningBackfill error caused by invalid date range request (#66874) When a backfill is requested with from_date after to_date, the Backfill record was committed before _get_info_list() returned an empty list, leaving an orphaned record that blocked subsequent backfills with AlreadyRunningBackfill until the scheduler's 2-minute cleanup ran. Add an InvalidBackfillDateRange exception and validate from_date <= to_date at the top of _validate_backfill_params(), before any DB operations. (cherry picked from commit 153623856efb44a3f60ef9fdb67e22e05609e067) --------- Co-authored-by: Rahul Vats <[email protected]> Co-authored-by: namratachaudhary <[email protected]> Co-authored-by: Park Jiwon <[email protected]> --- airflow-core/newsfragments/64207.significant.rst | 1 + .../core_api/routes/public/backfills.py | 3 ++ airflow-core/src/airflow/models/backfill.py | 21 ++++++-- airflow-core/tests/unit/models/test_backfill.py | 18 +++++++ .../observability/metrics/otel_logger.py | 58 +++++++++++++++++++--- .../observability/metrics/test_otel_logger.py | 47 ++++++++++++------ 6 files changed, 125 insertions(+), 23 deletions(-) diff --git a/airflow-core/newsfragments/64207.significant.rst b/airflow-core/newsfragments/64207.significant.rst new file mode 100644 index 00000000000..3254fa20a54 --- /dev/null +++ b/airflow-core/newsfragments/64207.significant.rst @@ -0,0 +1 @@ +OTel timer and timing metrics now use Histogram instead of Gauge, preserving count, sum, and bucket distribution across recordings. diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/backfills.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/backfills.py index 5d4e112df18..d6373f1d52a 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/backfills.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/backfills.py @@ -52,6 +52,7 @@ from airflow.models.backfill import ( DagNoScheduleException, InvalidBackfillConf, InvalidBackfillDate, + InvalidBackfillDateRange, InvalidBackfillDirection, InvalidReprocessBehavior, _create_backfill, @@ -265,6 +266,7 @@ def create_backfill( InvalidBackfillDirection, DagNoScheduleException, InvalidBackfillDate, + InvalidBackfillDateRange, InvalidBackfillConf, ) as e: raise RequestValidationError(str(e)) @@ -314,6 +316,7 @@ def create_backfill_dry_run( InvalidBackfillDirection, DagNoScheduleException, InvalidBackfillDate, + InvalidBackfillDateRange, InvalidBackfillConf, ) as e: raise RequestValidationError(str(e)) diff --git a/airflow-core/src/airflow/models/backfill.py b/airflow-core/src/airflow/models/backfill.py index 24c102f1cfc..4f4dba38bf0 100644 --- a/airflow-core/src/airflow/models/backfill.py +++ b/airflow-core/src/airflow/models/backfill.py @@ -100,6 +100,14 @@ class InvalidBackfillDate(AirflowException): """ +class InvalidBackfillDateRange(AirflowException): + """ + Raised when from_date is after to_date in a backfill request. + + :meta private: + """ + + class InvalidBackfillConf(AirflowException): """ Raised when the provided ``dag_run_conf`` fails validation against the DAG's params. @@ -259,6 +267,16 @@ def _validate_backfill_params( reprocess_behavior: ReprocessBehavior | None, dag_run_conf: dict | None = None, ) -> None: + + if from_date > to_date: + raise InvalidBackfillDateRange( + f"from_date ({from_date.isoformat()}) must not be after to_date ({to_date.isoformat()})." + ) + + current_time = timezone.utcnow() + if from_date >= current_time and to_date >= current_time: + raise InvalidBackfillDate("Backfill cannot be executed for future dates.") + depends_on_past = any(x.depends_on_past for x in dag.tasks) if depends_on_past: if reverse is True: @@ -270,9 +288,6 @@ def _validate_backfill_params( "Dag has tasks for which depends_on_past=True. " "You must set reprocess behavior to reprocess completed or reprocess failed." ) - current_time = timezone.utcnow() - if from_date >= current_time and to_date >= current_time: - raise InvalidBackfillDate("Backfill cannot be executed for future dates.") if dag_run_conf is not None: try: dag.params.deep_merge(dag_run_conf).validate() diff --git a/airflow-core/tests/unit/models/test_backfill.py b/airflow-core/tests/unit/models/test_backfill.py index 2749e0e6776..67b2b727a76 100644 --- a/airflow-core/tests/unit/models/test_backfill.py +++ b/airflow-core/tests/unit/models/test_backfill.py @@ -33,6 +33,7 @@ from airflow.models.backfill import ( BackfillDagRun, BackfillDagRunExceptionReason, InvalidBackfillConf, + InvalidBackfillDateRange, InvalidBackfillDirection, InvalidReprocessBehavior, ReprocessBehavior, @@ -779,3 +780,20 @@ def test_get_latest_dag_run_row_partitioned(session: Session): dr = session.scalar(stmt) assert dr is not None assert dr.start_date == timezone.parse("2026-02-23") + + +def test_create_backfill_from_date_after_to_date_raises(dag_maker, session): + with dag_maker(schedule="@daily") as dag: + PythonOperator(task_id="hi", python_callable=print) + session.commit() + + with pytest.raises(InvalidBackfillDateRange, match="must not be after to_date"): + _create_backfill( + dag_id=dag.dag_id, + from_date=pendulum.parse("2026-05-13"), + to_date=pendulum.parse("2026-05-12"), + max_active_runs=2, + reverse=False, + triggering_user_name="pytest", + dag_run_conf={}, + ) diff --git a/shared/observability/src/airflow_shared/observability/metrics/otel_logger.py b/shared/observability/src/airflow_shared/observability/metrics/otel_logger.py index c8db3ee02f9..b7b8effe8dc 100644 --- a/shared/observability/src/airflow_shared/observability/metrics/otel_logger.py +++ b/shared/observability/src/airflow_shared/observability/metrics/otel_logger.py @@ -30,6 +30,7 @@ from opentelemetry.sdk.metrics._internal.export import ( ConsoleMetricExporter, PeriodicExportingMetricReader, ) +from opentelemetry.sdk.metrics.view import ExponentialBucketHistogramAggregation, View from opentelemetry.sdk.resources import SERVICE_NAME, Resource from ..common import get_otel_data_exporter @@ -146,7 +147,8 @@ class _OtelTimer(Timer): """ An implementation of Stats.Timer() which records the result in the OTel Metrics Map. - OpenTelemetry does not have a native timer, we will store the values as a Gauge. + OpenTelemetry does not have a native timer; values are stored as a Histogram so that + all observations (count, sum, bucket distribution) are preserved across multiple recordings. :param name: The name of the timer. :param tags: Tags to append to the timer. @@ -160,9 +162,9 @@ class _OtelTimer(Timer): def stop(self, send: bool = True) -> None: super().stop(send) - if self.name and send and self.duration: - self.otel_logger.metrics_map.set_gauge_value( - full_name(prefix=self.otel_logger.prefix, name=self.name), self.duration, False, self.tags + if self.name and send and self.duration is not None: + self.otel_logger.metrics_map.record_histogram_value( + full_name(prefix=self.otel_logger.prefix, name=self.name), self.duration, self.tags ) @@ -278,11 +280,11 @@ class SafeOtelLogger: *, tags: Attributes = None, ) -> None: - """OTel does not have a native timer, stored as a Gauge whose value is elapsed ms.""" + """Record a timing observation as a Histogram to preserve distribution information.""" if self.metrics_validator.test(stat) and name_is_otel_safe(self.prefix, stat): if isinstance(dt, datetime.timedelta): dt = dt.total_seconds() * 1000.0 - self.metrics_map.set_gauge_value(full_name(prefix=self.prefix, name=stat), float(dt), False, tags) + self.metrics_map.record_histogram_value(full_name(prefix=self.prefix, name=stat), float(dt), tags) def timer( self, @@ -314,15 +316,29 @@ class InternalGauge: self.gauge.set(new_value, attributes=self.attributes) +class InternalHistogram: + """Stores a histogram instrument for timer/timing metrics.""" + + def __init__(self, meter, name: str): + otel_safe_name = _get_otel_safe_name(name) + self.histogram = meter.create_histogram(name=otel_safe_name, unit="ms") + log.debug("Created %s as type: %s", otel_safe_name, _type_as_str(self.histogram)) + + def record(self, value: float, tags: Attributes) -> None: + self.histogram.record(value, attributes=tags) + + class MetricsMap: """Stores Otel Instruments.""" def __init__(self, meter): self.meter = meter self.map = {} + self.histograms: dict[str, InternalHistogram] = {} def clear(self) -> None: self.map.clear() + self.histograms.clear() def _create_counter(self, name): """Create a new counter or up_down_counter for the provided name.""" @@ -376,6 +392,21 @@ class MetricsMap: self.map[key].set_value(value, delta) + def record_histogram_value(self, name: str, value: float, tags: Attributes) -> None: + """ + Record a timing observation in a Histogram instrument. + + Unlike a Gauge, a Histogram accumulates all observations so that count, sum, + and bucket distribution are preserved across multiple recordings. + + :param name: The name of the histogram to record. + :param value: The timing observation in milliseconds. + :param tags: Attributes to attach to the observation. + """ + if name not in self.histograms: + self.histograms[name] = InternalHistogram(meter=self.meter, name=name) + self.histograms[name].record(value, tags) + def flush_otel_metrics(): provider = metrics.get_meter_provider() @@ -400,6 +431,15 @@ def get_otel_logger( stat_name_handler: Callable[[str], str] | None = None, statsd_influxdb_enabled: bool = False, ) -> SafeOtelLogger: + """ + Build and return a :class:`SafeOtelLogger` backed by a configured :class:`MeterProvider`. + + Histogram instruments (used for ``timing()`` / ``timer()`` metrics) are aggregated with + :class:`~opentelemetry.sdk.metrics.view.ExponentialBucketHistogramAggregation` + so that bucket boundaries adapt automatically to the observed data range. This avoids + the need to hand-tune explicit bucket boundaries for metrics that span very different + scales (milliseconds to hours). + """ otel_env_config = load_metrics_env_config() effective_service_name: str = otel_env_config.service_name or service_name or "airflow" @@ -453,6 +493,12 @@ def get_otel_logger( MeterProvider( resource=resource, metric_readers=readers, + views=[ + View( + instrument_type=metrics.Histogram, + aggregation=ExponentialBucketHistogramAggregation(), + ) + ], shutdown_on_exit=False, ), ) diff --git a/shared/observability/tests/observability/metrics/test_otel_logger.py b/shared/observability/tests/observability/metrics/test_otel_logger.py index f7b348354d7..3c1369ec44b 100644 --- a/shared/observability/tests/observability/metrics/test_otel_logger.py +++ b/shared/observability/tests/observability/metrics/test_otel_logger.py @@ -25,6 +25,7 @@ from unittest import mock import pytest from opentelemetry.metrics import MeterProvider +from opentelemetry.sdk.metrics.view import ExponentialBucketHistogramAggregation, View from airflow_shared.observability.common import get_otel_data_exporter from airflow_shared.observability.exceptions import InvalidStatsNameException @@ -244,25 +245,28 @@ class TestOtelMetrics: self.stats.timing(name, dt=datetime.timedelta(seconds=123)) - self.meter.get_meter().create_gauge.assert_called_once_with(name=full_name(name)) - expected_value = 123000.0 - assert self.map[full_name(name)].value == expected_value + self.meter.get_meter().create_histogram.assert_called_once_with(name=full_name(name), unit="ms") + self.meter.get_meter().create_histogram.return_value.record.assert_called_once_with( + 123000.0, attributes=None + ) def test_timing_new_metric_with_tags(self, name): tags = {"hello": "world"} - key = _generate_key_name(full_name(name), tags) self.stats.timing(name, dt=1, tags=tags) - self.meter.get_meter().create_gauge.assert_called_once_with(name=full_name(name)) - self.map[key].attributes == tags + self.meter.get_meter().create_histogram.assert_called_once_with(name=full_name(name), unit="ms") + self.meter.get_meter().create_histogram.return_value.record.assert_called_once_with( + 1.0, attributes=tags + ) def test_timing_existing_metric(self, name): self.stats.timing(name, dt=1) self.stats.timing(name, dt=2) - self.meter.get_meter().create_gauge.assert_called_once_with(name=full_name(name)) - assert self.map[full_name(name)].value == 2 + # histogram created only once, but both observations are recorded + self.meter.get_meter().create_histogram.assert_called_once_with(name=full_name(name), unit="ms") + assert self.meter.get_meter().create_histogram.return_value.record.call_count == 2 # For the four test_timer_foo tests below: # time.perf_count() is called once to get the starting timestamp and again @@ -277,7 +281,7 @@ class TestOtelMetrics: expected_duration = 3140.0 assert timer.duration == expected_duration assert mock_time.call_count == 2 - self.meter.get_meter().create_gauge.assert_called_once_with(name=full_name(name)) + self.meter.get_meter().create_histogram.assert_called_once_with(name=full_name(name), unit="ms") @mock.patch.object(time, "perf_counter", side_effect=[0.0, 3.14]) def test_timer_no_name_returns_float_but_does_not_store_value(self, mock_time, name): @@ -288,7 +292,7 @@ class TestOtelMetrics: expected_duration = 3140.0 assert timer.duration == expected_duration assert mock_time.call_count == 2 - self.meter.get_meter().create_gauge.assert_not_called() + self.meter.get_meter().create_histogram.assert_not_called() @mock.patch.object(time, "perf_counter", side_effect=[0.0, 3.14]) def test_timer_start_and_stop_manually_send_false(self, mock_time, name): @@ -301,7 +305,7 @@ class TestOtelMetrics: expected_value = 3140.0 assert timer.duration == expected_value assert mock_time.call_count == 2 - self.meter.get_meter().create_gauge.assert_not_called() + self.meter.get_meter().create_histogram.assert_not_called() @mock.patch.object(time, "perf_counter", side_effect=[0.0, 3.14]) def test_timer_start_and_stop_manually_send_true(self, mock_time, name): @@ -314,7 +318,7 @@ class TestOtelMetrics: expected_value = 3140.0 assert timer.duration == expected_value assert mock_time.call_count == 2 - self.meter.get_meter().create_gauge.assert_called_once_with(name=full_name(name)) + self.meter.get_meter().create_histogram.assert_called_once_with(name=full_name(name), unit="ms") @pytest.mark.parametrize( ( @@ -415,6 +419,18 @@ class TestOtelMetrics: == f"opentelemetry.exporter.otlp.proto.{expected_exporter_module}.metric_exporter" ) + @mock.patch("airflow_shared.observability.metrics.otel_logger.metrics") + @mock.patch("airflow_shared.observability.metrics.otel_logger.MeterProvider") + def test_get_otel_logger_uses_exponential_histogram_view(self, mock_provider, mock_metrics): + get_otel_logger(host="localhost", port=4318) + + call_kwargs = mock_provider.call_args.kwargs + views = call_kwargs["views"] + assert len(views) == 1 + view = views[0] + assert isinstance(view, View) + assert isinstance(view._aggregation, ExponentialBucketHistogramAggregation) + def test_atexit_flush_on_process_exit(self): """ Run a process that initializes a logger, creates a stat and then exits. @@ -422,8 +438,11 @@ class TestOtelMetrics: The logger initialization registers an atexit hook. Test that the hook runs and flushes the created stat at shutdown. """ - test_module_name = "tests.observability.metrics.test_otel_logger" - function_call_str = f"import {test_module_name} as m; m.mock_service_run()" + function_call_str = ( + "from airflow_shared.observability.metrics.otel_logger import get_otel_logger; " + "logger = get_otel_logger(debug=True); " + "logger.incr('my_test_stat')" + ) proc = subprocess.run( [sys.executable, "-c", function_call_str],
