This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 57a7c64a775 Fix broken `dag_processing.total_parse_time` metric
(#62128)
57a7c64a775 is described below
commit 57a7c64a77503fef4eb7c6801a28a628a4098535
Author: Nick Stenning <[email protected]>
AuthorDate: Sat Feb 21 00:39:46 2026 +0100
Fix broken `dag_processing.total_parse_time` metric (#62128)
DagFileProcessorManager has been emitting a nonsense value for
`dag_processing.total_parse_time` since 8774f28d76, which reversed the
order in which `emit_metrics` and `prepare_file_queue` (then called
`prepare_file_path_queue`) were called.
As `prepare_file_path_queue` was responsible for resetting the value of
`self._parsing_start_time`, the assumption made by `emit_metrics` was
that it would be called once the file queue had been cleared, but
crucially before `prepare_file_queue` was called to refill the queue.
Additionally, there was no guarantee that we'd parsed any files at all
since the last time the metric was emitted. If no work was due, we'd
gladly emit near-zero metrics every time around the while loop.
I've rearranged things in such a way that I hope will be harder to
accidentally break in future:
- `self._parsing_start_time` may be reset whenever files are added to
the queue, if it was not set already.
- metrics are emitted when `prepare_file_queue` is called -- when the
queue is empty -- but only if `self._parsing_start_time` is set,
meaning only if we've actually parsed any files since the last time
metrics were emitted.
Together, this means we should now emit metrics once per parsing loop.
I've added a test which fails on main and passes on this branch.
---
airflow-core/src/airflow/dag_processing/manager.py | 101 ++++++++++++---------
airflow-core/src/airflow/typing_compat.py | 6 +-
.../tests/unit/dag_processing/test_manager.py | 31 ++++++-
3 files changed, 93 insertions(+), 45 deletions(-)
diff --git a/airflow-core/src/airflow/dag_processing/manager.py
b/airflow-core/src/airflow/dag_processing/manager.py
index c0dff728806..ba4715a8ae7 100644
--- a/airflow-core/src/airflow/dag_processing/manager.py
+++ b/airflow-core/src/airflow/dag_processing/manager.py
@@ -36,7 +36,7 @@ from dataclasses import dataclass, field
from datetime import datetime, timedelta
from operator import attrgetter, itemgetter
from pathlib import Path
-from typing import TYPE_CHECKING, Any, NamedTuple, cast
+from typing import TYPE_CHECKING, Any, Literal, NamedTuple, cast
import attrs
import structlog
@@ -65,6 +65,7 @@ from airflow.observability.metrics import stats_utils
from airflow.observability.trace import DebugTrace
from airflow.sdk import SecretCache
from airflow.sdk.log import init_log_file, logging_processors
+from airflow.typing_compat import assert_never
from airflow.utils.file import list_py_file_paths, might_contain_dag
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.net import get_hostname
@@ -208,7 +209,7 @@ class DagFileProcessorManager(LoggingMixin):
_processors: dict[DagFileInfo, DagFileProcessorProcess] =
attrs.field(factory=dict, init=False)
- _parsing_start_time: float = attrs.field(init=False)
+ _parsing_start_time: float | None = attrs.field(default=None, init=False)
_num_run: int = attrs.field(default=0, init=False)
_callback_to_execute: dict[DagFileInfo, list[CallbackRequest]] =
attrs.field(
@@ -395,7 +396,6 @@ class DagFileProcessorManager(LoggingMixin):
# clear down, we must have cleared all files found from
scanning the dags dir _and_ have
# cleared all files added as a result of callbacks
self.prepare_file_queue(known_files=known_files)
- self.emit_metrics()
self._start_new_processes()
@@ -453,16 +453,8 @@ class DagFileProcessorManager(LoggingMixin):
def _queue_requested_files_for_parsing(self) -> None:
"""Queue any files requested for parsing as requested by users via
UI/API."""
files = self._get_priority_files()
- bundles_to_refresh: set[str] = set()
- for file in files:
- # Try removing the file if already present
- with contextlib.suppress(ValueError):
- self._file_queue.remove(file)
- # enqueue file to the start of the queue.
- self._file_queue.appendleft(file)
- bundles_to_refresh.add(file.bundle_name)
-
- self._force_refresh_bundles |= bundles_to_refresh
+ self._add_files_to_queue(files, mode="frontprio")
+ self._force_refresh_bundles |= {file.bundle_name for file in files}
if self._force_refresh_bundles:
self.log.info("Bundles being force refreshed: %s", ",
".join(self._force_refresh_bundles))
@@ -544,7 +536,7 @@ class DagFileProcessorManager(LoggingMixin):
bundle_version=request.bundle_version,
)
self._callback_to_execute[file_info].append(request)
- self._add_files_to_queue([file_info], True)
+ self._add_files_to_queue([file_info], mode="front")
Stats.incr("dag_processing.other_callback_count")
def _refresh_dag_bundles(self, known_files: dict[str, set[DagFileInfo]]):
@@ -1021,7 +1013,7 @@ class DagFileProcessorManager(LoggingMixin):
if new_files:
self.log.info("Adding %d new files to the front of the queue",
len(new_files))
- self._add_files_to_queue(new_files, True)
+ self._add_files_to_queue(new_files, mode="front")
def _resort_file_queue(self):
if self._file_parsing_sort_mode == "modified_time" and
self._file_queue:
@@ -1076,7 +1068,15 @@ class DagFileProcessorManager(LoggingMixin):
Note this method is only called when the file path queue is empty
"""
- self._parsing_start_time = time.perf_counter()
+ # We only emit metrics after processing all files in the queue. If
`self._parsing_start_time` is None
+ # when this method is called, no files have yet been added to the
queue so we shouldn't emit metrics.
+ if self._parsing_start_time is not None:
+ emit_metrics(
+ parse_time=time.perf_counter() - self._parsing_start_time,
+ stats=list(self._file_stats.values()),
+ )
+ self._parsing_start_time = None
+
# If the file path is already being processed, or if a file was
# processed recently, wait until the next batch
in_progress = set(self._processors)
@@ -1122,7 +1122,7 @@ class DagFileProcessorManager(LoggingMixin):
"Queuing the following files for processing:\n\t%s",
"\n\t".join(str(f.rel_path) for f in to_queue),
)
- self._add_files_to_queue(to_queue, False)
+ self._add_files_to_queue(to_queue, mode="back")
Stats.incr("dag_processing.file_path_queue_update_count")
def _kill_timed_out_processors(self):
@@ -1160,13 +1160,34 @@ class DagFileProcessorManager(LoggingMixin):
processor = self._processors.pop(proc)
processor.logger_filehandle.close()
- def _add_files_to_queue(self, files: list[DagFileInfo], add_at_front:
bool):
+ def _add_files_to_queue(
+ self,
+ files: list[DagFileInfo],
+ *,
+ mode: Literal["front", "back", "frontprio"],
+ ):
"""Add stuff to the back or front of the file queue, unless it's
already present."""
- new_files = list(f for f in files if f not in self._file_queue)
- if add_at_front:
+ if mode == "frontprio":
+ for file in files:
+ # Try removing the file if already present
+ with contextlib.suppress(ValueError):
+ self._file_queue.remove(file)
+ # enqueue file to the start of the queue.
+ self._file_queue.appendleft(file)
+ elif mode == "front":
+ new_files = list(f for f in files if f not in self._file_queue)
self._file_queue.extendleft(new_files)
- else:
+ elif mode == "back":
+ new_files = list(f for f in files if f not in self._file_queue)
self._file_queue.extend(new_files)
+ else:
+ assert_never(mode)
+
+ # If we've just added files to the queue for the first time since
metrics were last emitted, reset the
+ # parse time counter.
+ if self._parsing_start_time is None and self._file_queue:
+ self._parsing_start_time = time.perf_counter()
+
Stats.gauge("dag_processing.file_path_queue_size",
len(self._file_queue))
def max_runs_reached(self):
@@ -1193,27 +1214,25 @@ class DagFileProcessorManager(LoggingMixin):
if pids_to_kill:
kill_child_processes_by_pids(pids_to_kill)
- def emit_metrics(self):
- """
- Emit metrics about dag parsing summary.
- This is called once every time around the parsing "loop" - i.e. after
- all files have been parsed.
- """
- with DebugTrace.start_span(span_name="emit_metrics",
component="DagFileProcessorManager") as span:
- parse_time = time.perf_counter() - self._parsing_start_time
- Stats.gauge("dag_processing.total_parse_time", parse_time)
- Stats.gauge("dagbag_size", sum(stat.num_dags for stat in
self._file_stats.values()))
- Stats.gauge(
- "dag_processing.import_errors", sum(stat.import_errors for
stat in self._file_stats.values())
- )
- span.set_attributes(
- {
- "total_parse_time": parse_time,
- "dag_bag_size": sum(stat.num_dags for stat in
self._file_stats.values()),
- "import_errors": sum(stat.import_errors for stat in
self._file_stats.values()),
- }
- )
+def emit_metrics(*, parse_time: float, stats: Sequence[DagFileStat]):
+ """
+ Emit metrics about dag parsing summary.
+
+ This is called once every time around the parsing "loop" - i.e. after
+ all files have been parsed.
+ """
+ with DebugTrace.start_span(span_name="emit_metrics",
component="DagFileProcessorManager") as span:
+ Stats.gauge("dag_processing.total_parse_time", parse_time)
+ Stats.gauge("dagbag_size", sum(stat.num_dags for stat in stats))
+ Stats.gauge("dag_processing.import_errors", sum(stat.import_errors for
stat in stats))
+ span.set_attributes(
+ {
+ "total_parse_time": parse_time,
+ "dag_bag_size": sum(stat.num_dags for stat in stats),
+ "import_errors": sum(stat.import_errors for stat in stats),
+ }
+ )
def process_parse_results(
diff --git a/airflow-core/src/airflow/typing_compat.py
b/airflow-core/src/airflow/typing_compat.py
index e1efb87067b..31b6e6ecaa5 100644
--- a/airflow-core/src/airflow/typing_compat.py
+++ b/airflow-core/src/airflow/typing_compat.py
@@ -19,7 +19,7 @@
from __future__ import annotations
-__all__ = ["Literal", "ParamSpec", "Self", "TypeAlias", "TypeGuard", "Unpack"]
+__all__ = ["Literal", "ParamSpec", "Self", "TypeAlias", "TypeGuard", "Unpack",
"assert_never"]
import sys
@@ -27,6 +27,6 @@ import sys
from typing import Literal, ParamSpec, TypeAlias, TypeGuard
if sys.version_info >= (3, 11):
- from typing import Self, Unpack
+ from typing import Self, Unpack, assert_never
else:
- from typing_extensions import Self, Unpack
+ from typing_extensions import Self, Unpack, assert_never
diff --git a/airflow-core/tests/unit/dag_processing/test_manager.py
b/airflow-core/tests/unit/dag_processing/test_manager.py
index dd8eb88cdb7..edfc3a505f0 100644
--- a/airflow-core/tests/unit/dag_processing/test_manager.py
+++ b/airflow-core/tests/unit/dag_processing/test_manager.py
@@ -26,7 +26,7 @@ import shutil
import signal
import textwrap
import time
-from collections import deque
+from collections import defaultdict, deque
from datetime import datetime, timedelta
from pathlib import Path
from socket import socket, socketpair
@@ -1591,3 +1591,32 @@ class TestDagFileProcessorManager:
stats_init_mock.assert_called_once()
call_kwargs = stats_init_mock.call_args.kwargs
assert "factory" in call_kwargs
+
+ @mock.patch("airflow.dag_processing.manager.Stats.gauge")
+ def test_stats_total_parse_time(self, statsd_gauge_mock, tmp_path,
configure_testing_dag_bundle):
+ key = "dag_processing.total_parse_time"
+ gauge_values = defaultdict(list)
+ statsd_gauge_mock.side_effect = lambda name, value:
gauge_values[name].append(value)
+
+ dag_path = tmp_path / "temp_dag.py"
+ dag_code = textwrap.dedent(
+ """
+ from airflow import DAG
+ dag = DAG(dag_id='temp_dag')
+ """
+ )
+ dag_path.write_text(dag_code)
+
+ with configure_testing_dag_bundle(tmp_path):
+ manager = DagFileProcessorManager(max_runs=0)
+
+ for _ in range(3):
+ manager.max_runs += 1
+ manager.run()
+
+ assert key in gauge_values
+ assert len(gauge_values[key]) == 1
+ assert gauge_values[key][0] >= 1e-4
+
+ dag_path.touch() # make the loop run faster
+ gauge_values.clear()