This is an automated email from the ASF dual-hosted git repository.
ash pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new c2916954582 Refactor stat helper methods on DagFileProcessorManager
(#44818)
c2916954582 is described below
commit c29169545825fdc9486db0d6e2281b473518ff05
Author: Ash Berlin-Taylor <[email protected]>
AuthorDate: Tue Dec 10 17:29:07 2024 +0000
Refactor stat helper methods on DagFileProcessorManager (#44818)
This is some cleanup/preporatory work in order to swap the Dag processor
over
to use the TaskSDK, and this small change is done to make the future work
easier.
The main bulk of this change is to remote the `get_*` helper methods and
make
`_file_stats` a defaultdict instead, and then also swap the DagFileStat
class
from a NamedTuple (which can't have defaults) to an attrs-defined class
which
does.
To make some of the places of use nicer/still one line, the type of
last_duration was changed form a timedelta to a float, as this was what the
`get_last_runtime` method did, and we don't need any of the capabilities of
a
timedelta object elsewhere.
---
airflow/dag_processing/manager.py | 134 ++++++++---------------------------
tests/dag_processing/test_manager.py | 11 +--
2 files changed, 37 insertions(+), 108 deletions(-)
diff --git a/airflow/dag_processing/manager.py
b/airflow/dag_processing/manager.py
index d0fbcf44d95..2df7890a48f 100644
--- a/airflow/dag_processing/manager.py
+++ b/airflow/dag_processing/manager.py
@@ -31,12 +31,13 @@ import sys
import time
import zipfile
from collections import defaultdict, deque
-from collections.abc import Iterator
+from collections.abc import Iterator, MutableMapping
from datetime import datetime, timedelta
from importlib import import_module
from pathlib import Path
from typing import TYPE_CHECKING, Any, Callable, NamedTuple, cast
+import attrs
from setproctitle import setproctitle
from sqlalchemy import delete, select, update
from tabulate import tabulate
@@ -81,15 +82,16 @@ class DagParsingStat(NamedTuple):
all_files_processed: bool
-class DagFileStat(NamedTuple):
[email protected]
+class DagFileStat:
"""Information about single processing of one file."""
- num_dags: int
- import_errors: int
- last_finish_time: datetime | None
- last_duration: timedelta | None
- run_count: int
- last_num_of_db_queries: int
+ num_dags: int = 0
+ import_errors: int = 0
+ last_finish_time: datetime | None = None
+ last_duration: float | None = None
+ run_count: int = 0
+ last_num_of_db_queries: int = 0
class DagParsingSignal(enum.Enum):
@@ -353,15 +355,6 @@ class DagFileProcessorManager(LoggingMixin):
:param async_mode: whether to start the manager in async mode
"""
- DEFAULT_FILE_STAT = DagFileStat(
- num_dags=0,
- import_errors=0,
- last_finish_time=None,
- last_duration=None,
- run_count=0,
- last_num_of_db_queries=0,
- )
-
def __init__(
self,
dag_directory: os.PathLike[str],
@@ -416,7 +409,7 @@ class DagFileProcessorManager(LoggingMixin):
self._num_run = 0
# Map from file path to stats about the file
- self._file_stats: dict[str, DagFileStat] = {}
+ self._file_stats: MutableMapping[str, DagFileStat] =
defaultdict(DagFileStat)
# Last time that the DAG dir was traversed to look for files
self.last_dag_dir_refresh_time =
timezone.make_aware(datetime.fromtimestamp(0))
@@ -488,7 +481,7 @@ class DagFileProcessorManager(LoggingMixin):
elapsed_time_since_refresh = (now -
self.last_deactivate_stale_dags_time).total_seconds()
if elapsed_time_since_refresh > self.parsing_cleanup_interval:
last_parsed = {
- fp: self.get_last_finish_time(fp) for fp in self.file_paths if
self.get_last_finish_time(fp)
+ fp: stat.last_finish_time for fp, stat in
self._file_stats.items() if stat.last_finish_time
}
DagFileProcessorManager.deactivate_stale_dags(
last_parsed=last_parsed,
@@ -501,7 +494,7 @@ class DagFileProcessorManager(LoggingMixin):
@provide_session
def deactivate_stale_dags(
cls,
- last_parsed: dict[str, datetime | None],
+ last_parsed: dict[str, datetime],
dag_directory: str,
stale_dag_threshold: int,
session: Session = NEW_SESSION,
@@ -655,7 +648,9 @@ class DagFileProcessorManager(LoggingMixin):
span.add_event(name="print_stat")
self._print_stat()
- all_files_processed = all(self.get_last_finish_time(x) is not
None for x in self.file_paths)
+ all_files_processed = all(
+ self._file_stats[x].last_finish_time is not None for x in
self.file_paths
+ )
max_runs_reached = self.max_runs_reached()
try:
@@ -872,30 +867,27 @@ class DagFileProcessorManager(LoggingMixin):
rows = []
now = timezone.utcnow()
for file_path in known_file_paths:
- last_runtime = self.get_last_runtime(file_path)
- num_dags = self.get_last_dag_count(file_path)
- num_errors = self.get_last_error_count(file_path)
+ stat = self._file_stats[file_path]
file_name = Path(file_path).stem
processor_pid = self.get_pid(file_path)
processor_start_time = self.get_start_time(file_path)
runtime = (now - processor_start_time) if processor_start_time
else None
- last_run = self.get_last_finish_time(file_path)
+ last_run = stat.last_finish_time
if last_run:
seconds_ago = (now - last_run).total_seconds()
Stats.gauge(f"dag_processing.last_run.seconds_ago.{file_name}", seconds_ago)
- last_num_of_db_queries = self.get_last_num_of_db_queries(file_path)
- Stats.gauge(f"dag_processing.last_num_of_db_queries.{file_name}",
last_num_of_db_queries)
+ Stats.gauge(f"dag_processing.last_num_of_db_queries.{file_name}",
stat.last_num_of_db_queries)
rows.append(
(
file_path,
processor_pid,
runtime,
- num_dags,
- num_errors,
- last_runtime,
+ stat.num_dags,
+ stat.import_errors,
+ stat.last_duration,
last_run,
- last_num_of_db_queries,
+ stat.last_num_of_db_queries,
)
)
@@ -955,58 +947,6 @@ class DagFileProcessorManager(LoggingMixin):
"""
return [x.pid for x in self._processors.values()]
- def get_last_runtime(self, file_path) -> float | None:
- """
- Retrieve the last processing time of a specific path.
-
- :param file_path: the path to the file that was processed
- :return: the runtime (in seconds) of the process of the last run, or
- None if the file was never processed.
- """
- stat = self._file_stats.get(file_path)
- return stat.last_duration.total_seconds() if stat and
stat.last_duration else None
-
- def get_last_dag_count(self, file_path) -> int | None:
- """
- Retrieve the total DAG count at a specific path.
-
- :param file_path: the path to the file that was processed
- :return: the number of dags loaded from that file, or None if the file
was never processed.
- """
- stat = self._file_stats.get(file_path)
- return stat.num_dags if stat else None
-
- def get_last_error_count(self, file_path) -> int | None:
- """
- Retrieve the total number of errors from processing a specific path.
-
- :param file_path: the path to the file that was processed
- :return: the number of import errors from processing, or None if the
file was never processed.
- """
- stat = self._file_stats.get(file_path)
- return stat.import_errors if stat else None
-
- def get_last_num_of_db_queries(self, file_path) -> int | None:
- """
- Retrieve the number of queries performed to the Airflow database
during last parsing of the file.
-
- :param file_path: the path to the file that was processed
- :return: the number of queries performed to the Airflow database
during last parsing of the file,
- or None if the file was never processed.
- """
- stat = self._file_stats.get(file_path)
- return stat.last_num_of_db_queries if stat else None
-
- def get_last_finish_time(self, file_path) -> datetime | None:
- """
- Retrieve the last completion time for processing a specific path.
-
- :param file_path: the path to the file that was processed
- :return: the finish time of the process of the last run, or None if
the file was never processed.
- """
- stat = self._file_stats.get(file_path)
- return stat.last_finish_time if stat else None
-
def get_start_time(self, file_path) -> datetime | None:
"""
Retrieve the last start time for processing a specific path.
@@ -1019,15 +959,6 @@ class DagFileProcessorManager(LoggingMixin):
return self._processors[file_path].start_time
return None
- def get_run_count(self, file_path) -> int:
- """
- Return the number of times the given file has been parsed.
-
- :param file_path: the path to the file that's being processed.
- """
- stat = self._file_stats.get(file_path)
- return stat.run_count if stat else 0
-
def get_dag_directory(self) -> str:
"""Return the dag_director as a string."""
if isinstance(self._dag_directory, Path):
@@ -1092,13 +1023,13 @@ class DagFileProcessorManager(LoggingMixin):
num_dags = 0
last_num_of_db_queries = 0
- last_duration = last_finish_time - processor.start_time
+ last_duration = (last_finish_time -
processor.start_time).total_seconds()
stat = DagFileStat(
num_dags=num_dags,
import_errors=count_import_errors,
last_finish_time=last_finish_time,
last_duration=last_duration,
- run_count=self.get_run_count(processor.file_path) + 1,
+ run_count=self._file_stats[processor.file_path].run_count + 1,
last_num_of_db_queries=last_num_of_db_queries,
)
self._file_stats[processor.file_path] = stat
@@ -1110,7 +1041,7 @@ class DagFileProcessorManager(LoggingMixin):
span.set_attributes(
{
"file_path": processor.file_path,
- "run_count": self.get_run_count(processor.file_path) + 1,
+ "run_count": stat.run_count,
}
)
@@ -1146,8 +1077,8 @@ class DagFileProcessorManager(LoggingMixin):
span.end(end_time=datetime_to_nano(last_finish_time))
- Stats.timing(f"dag_processing.last_duration.{file_name}",
last_duration)
- Stats.timing("dag_processing.last_duration", last_duration,
tags={"file_name": file_name})
+ Stats.timing(f"dag_processing.last_duration.{file_name}",
last_duration * 1000.0)
+ Stats.timing("dag_processing.last_duration", last_duration * 1000.0,
tags={"file_name": file_name})
def collect_results(self) -> None:
"""Collect the result from any finished DAG processors."""
@@ -1219,7 +1150,6 @@ class DagFileProcessorManager(LoggingMixin):
if file_path not in self._file_stats:
# We found new file after refreshing dir. add to parsing queue
at start
self.log.info("Adding new file %s to parsing queue", file_path)
- self._file_stats[file_path] =
DagFileProcessorManager.DEFAULT_FILE_STAT
self._file_path_queue.appendleft(file_path)
span = Trace.get_current_span()
if span.is_recording():
@@ -1266,7 +1196,7 @@ class DagFileProcessorManager(LoggingMixin):
# from being added to file_path_queue
# unless they were modified recently and parsing mode is
"modified_time"
# in which case we don't honor "self._file_process_interval"
(min_file_process_interval)
- last_finish_time = self.get_last_finish_time(file_path)
+ last_finish_time = self._file_stats[file_path].last_finish_time
if (
last_finish_time is not None
and (now - last_finish_time).total_seconds() <
self._file_process_interval
@@ -1316,8 +1246,6 @@ class DagFileProcessorManager(LoggingMixin):
"Queuing the following files for processing:\n\t%s",
"\n\t".join(files_paths_to_queue)
)
- for file_path in files_paths_to_queue:
- self._file_stats.setdefault(file_path,
DagFileProcessorManager.DEFAULT_FILE_STAT)
self._add_paths_to_queue(files_paths_to_queue, False)
Stats.incr("dag_processing.file_path_queue_update_count")
@@ -1355,8 +1283,8 @@ class DagFileProcessorManager(LoggingMixin):
num_dags=0,
import_errors=1,
last_finish_time=now,
- last_duration=duration,
- run_count=self.get_run_count(file_path) + 1,
+ last_duration=duration.total_seconds(),
+ run_count=self._file_stats[processor.file_path].run_count
+ 1,
last_num_of_db_queries=0,
)
self._file_stats[processor.file_path] = stat
diff --git a/tests/dag_processing/test_manager.py
b/tests/dag_processing/test_manager.py
index 76652740fbc..3154f3a49df 100644
--- a/tests/dag_processing/test_manager.py
+++ b/tests/dag_processing/test_manager.py
@@ -264,7 +264,7 @@ class TestDagProcessorJobRunner:
mock_processor.terminate.side_effect = None
manager._processors["missing_file.txt"] = mock_processor
- manager._file_stats["missing_file.txt"] = DagFileStat(0, 0, None,
None, 0, 0)
+ manager._file_stats["missing_file.txt"] = DagFileStat()
manager.set_file_paths(["abc.txt"])
assert manager._processors == {}
@@ -533,7 +533,7 @@ class TestDagProcessorJobRunner:
assert last_finish_time < file_1_new_mtime
assert (
manager._file_process_interval
- > (freezed_base_time -
manager.get_last_finish_time("file_1.py")).total_seconds()
+ > (freezed_base_time -
manager._file_stats["file_1.py"].last_finish_time).total_seconds()
)
@mock.patch("zipfile.is_zipfile", return_value=True)
@@ -937,17 +937,18 @@ class TestDagProcessorJobRunner:
)
self.run_processor_manager_one_loop(manager, parent_pipe)
- last_runtime = manager.get_last_runtime(manager.file_paths[0])
+ last_runtime = manager._file_stats[manager.file_paths[0]].last_duration
+ assert last_runtime is not None
child_pipe.close()
parent_pipe.close()
statsd_timing_mock.assert_has_calls(
[
- mock.call("dag_processing.last_duration.temp_dag",
timedelta(seconds=last_runtime)),
+ mock.call("dag_processing.last_duration.temp_dag",
last_runtime * 1000.0),
mock.call(
"dag_processing.last_duration",
- timedelta(seconds=last_runtime),
+ last_runtime * 1000.0,
tags={"file_name": "temp_dag"},
),
],