This is an automated email from the ASF dual-hosted git repository.
shahar1 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 1622eccbfaf Dag processor: reduce file-queue dedup from O(N²) to O(N)
with OrderedDict (#67750)
1622eccbfaf is described below
commit 1622eccbfaf4f75dd8e9f33b7b8ceac7758ae283
Author: Shahar Epstein <[email protected]>
AuthorDate: Wed Jun 3 07:24:27 2026 +0300
Dag processor: reduce file-queue dedup from O(N²) to O(N) with OrderedDict
(#67750)
---
airflow-core/src/airflow/dag_processing/manager.py | 30 ++++-----
.../tests/unit/dag_processing/test_manager.py | 72 +++++++++++-----------
2 files changed, 51 insertions(+), 51 deletions(-)
diff --git a/airflow-core/src/airflow/dag_processing/manager.py
b/airflow-core/src/airflow/dag_processing/manager.py
index 691237cc46e..fc00b6730c8 100644
--- a/airflow-core/src/airflow/dag_processing/manager.py
+++ b/airflow-core/src/airflow/dag_processing/manager.py
@@ -19,7 +19,6 @@
from __future__ import annotations
-import contextlib
import functools
import gc
import inspect
@@ -31,7 +30,7 @@ import signal
import sys
import time
import zipfile
-from collections import defaultdict, deque
+from collections import OrderedDict, defaultdict
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from operator import attrgetter, itemgetter
@@ -237,7 +236,7 @@ class DagFileProcessorManager(LoggingMixin):
heartbeat: Callable[[], None] = attrs.field(default=lambda: None)
"""An overridable heartbeat called once every time around the loop"""
- _file_queue: deque[DagFileInfo] = attrs.field(factory=deque, init=False)
+ _file_queue: OrderedDict[DagFileInfo, None] =
attrs.field(factory=OrderedDict, init=False)
_file_stats: dict[DagFileInfo, DagFileStat] = attrs.field(
factory=lambda: defaultdict(DagFileStat), init=False
)
@@ -1067,7 +1066,7 @@ class DagFileProcessorManager(LoggingMixin):
def purge_removed_files_from_queue(self, present: set[DagFileInfo]):
"""Remove from queue any files no longer observed locally."""
present_keys = {file.presence_key for file in present}
- self._file_queue = deque(x for x in self._file_queue if x.presence_key
in present_keys)
+ self._file_queue = OrderedDict((x, None) for x in self._file_queue if
x.presence_key in present_keys)
stats.gauge("dag_processing.file_path_queue_size",
len(self._file_queue))
def remove_orphaned_file_stats(self, present: set[DagFileInfo]):
@@ -1300,7 +1299,7 @@ class DagFileProcessorManager(LoggingMixin):
def _start_new_processes(self):
"""Start more processors if we have enough slots and files to
process."""
while self._parallelism > len(self._processors) and self._file_queue:
- file = self._file_queue.popleft()
+ file, _ = self._file_queue.popitem(last=False)
# Stop creating duplicate processor i.e. processor with the same
filepath
if file in self._processors:
continue
@@ -1347,7 +1346,7 @@ class DagFileProcessorManager(LoggingMixin):
sorted_regular_files, _ = self._sort_by_mtime(regular_files)
# Put callback files at the front, then sorted regular files
- self._file_queue = deque(callback_files + sorted_regular_files)
+ self._file_queue = OrderedDict.fromkeys(callback_files +
sorted_regular_files)
def _sort_by_mtime(self, files: Iterable[DagFileInfo]):
file_stats_by_presence_key = {file.presence_key: stat for file, stat
in self._file_stats.items()}
@@ -1512,17 +1511,18 @@ class DagFileProcessorManager(LoggingMixin):
"""Add stuff to the back or front of the file queue, unless it's
already present."""
if mode == "frontprio":
for file in files:
- # Try removing the file if already present
- with contextlib.suppress(ValueError):
- self._file_queue.remove(file)
- # enqueue file to the start of the queue.
- self._file_queue.appendleft(file)
+ self._file_queue.pop(file, None)
+ self._file_queue[file] = None
+ self._file_queue.move_to_end(file, last=False)
elif mode == "front":
- new_files = list(f for f in files if f not in self._file_queue)
- self._file_queue.extendleft(new_files)
+ for file in files:
+ if file not in self._file_queue:
+ self._file_queue[file] = None
+ self._file_queue.move_to_end(file, last=False)
elif mode == "back":
- new_files = list(f for f in files if f not in self._file_queue)
- self._file_queue.extend(new_files)
+ for file in files:
+ if file not in self._file_queue:
+ self._file_queue[file] = None
else:
assert_never(mode)
diff --git a/airflow-core/tests/unit/dag_processing/test_manager.py
b/airflow-core/tests/unit/dag_processing/test_manager.py
index 33aaa828888..941d090f229 100644
--- a/airflow-core/tests/unit/dag_processing/test_manager.py
+++ b/airflow-core/tests/unit/dag_processing/test_manager.py
@@ -28,7 +28,7 @@ import signal
import textwrap
import time
import zipfile
-from collections import defaultdict, deque
+from collections import OrderedDict, defaultdict
from datetime import datetime, timedelta
from pathlib import Path
from socket import socket, socketpair
@@ -372,7 +372,7 @@ class TestDagFileProcessorManager:
file_1 = DagFileInfo(bundle_name="testing",
rel_path=Path("file_1.py"), bundle_path=TEST_DAGS_FOLDER)
file_2 = DagFileInfo(bundle_name="testing",
rel_path=Path("file_2.py"), bundle_path=TEST_DAGS_FOLDER)
file_3 = DagFileInfo(bundle_name="testing",
rel_path=Path("file_3.py"), bundle_path=TEST_DAGS_FOLDER)
- manager._file_queue = deque([file_1, file_2, file_3])
+ manager._file_queue = OrderedDict.fromkeys([file_1, file_2, file_3])
# Mock that only one processor exists. This processor runs with
'file_1'
manager._processors[file_1] = MagicMock()
@@ -388,7 +388,7 @@ class TestDagFileProcessorManager:
assert file_1 in manager._processors.keys()
assert file_2 in manager._processors.keys()
- assert deque([file_3]) == manager._file_queue
+ assert OrderedDict.fromkeys([file_3]) == manager._file_queue
def
test_handle_removed_files_when_processor_file_path_not_in_new_file_paths(self):
"""Ensure processors and file stats are removed when the file path is
not in the new file paths"""
@@ -438,21 +438,21 @@ class TestDagFileProcessorManager:
versioned_file = _get_versioned_file_info("callbacks.py")
present_file = _get_file_infos(["callbacks.py"])[0]
- manager._file_queue = deque([versioned_file])
+ manager._file_queue = OrderedDict.fromkeys([versioned_file])
manager.purge_removed_files_from_queue(present={present_file})
- assert manager._file_queue == deque([versioned_file])
+ assert manager._file_queue == OrderedDict.fromkeys([versioned_file])
def
test_purge_removed_files_drops_versioned_callback_file_when_truly_absent(self):
manager = DagFileProcessorManager(max_runs=1)
versioned_file = _get_versioned_file_info("callbacks.py")
- manager._file_queue = deque([versioned_file])
+ manager._file_queue = OrderedDict.fromkeys([versioned_file])
manager.purge_removed_files_from_queue(present=set())
- assert manager._file_queue == deque()
+ assert manager._file_queue == OrderedDict()
def
test_terminate_orphan_processes_keeps_versioned_callback_processor_when_unversioned_file_is_present(
self,
@@ -511,9 +511,9 @@ class TestDagFileProcessorManager:
manager = DagFileProcessorManager(max_runs=1)
known_files = {"some-bundle": set(dag_files)}
- assert manager._file_queue == deque()
+ assert manager._file_queue == OrderedDict()
manager.prepare_file_queue(known_files=known_files)
- assert manager._file_queue == deque(ordered_dag_files)
+ assert manager._file_queue == OrderedDict.fromkeys(ordered_dag_files)
@conf_vars({("dag_processor", "file_parsing_sort_mode"):
"random_seeded_by_host"})
def test_files_sorted_random_seeded_by_host(self):
@@ -522,10 +522,10 @@ class TestDagFileProcessorManager:
known_files = {"anything": f_infos}
manager = DagFileProcessorManager(max_runs=1)
- assert manager._file_queue == deque()
+ assert manager._file_queue == OrderedDict()
manager.prepare_file_queue(known_files=known_files) # using list over
test for reproducibility
random.Random(get_hostname()).shuffle(f_infos)
- expected = deque(f_infos)
+ expected = OrderedDict.fromkeys(f_infos)
assert manager._file_queue == expected
# Verify running it again produces same order
@@ -548,7 +548,7 @@ class TestDagFileProcessorManager:
manager = DagFileProcessorManager(max_runs=1)
- assert manager._file_queue == deque()
+ assert manager._file_queue == OrderedDict()
manager.prepare_file_queue(known_files={"any": set(dag_files)})
ordered_files = _get_file_infos(
[
@@ -558,7 +558,7 @@ class TestDagFileProcessorManager:
"file_2-ss=2.0.py",
]
)
- assert manager._file_queue == deque(ordered_files)
+ assert manager._file_queue == OrderedDict.fromkeys(ordered_files)
@conf_vars({("dag_processor", "file_parsing_sort_mode"): "modified_time"})
@mock.patch("airflow.utils.file.os.path.getmtime", new=mock_get_mtime)
@@ -570,7 +570,7 @@ class TestDagFileProcessorManager:
manager = DagFileProcessorManager(max_runs=1)
manager.prepare_file_queue(known_files={"any": set(file_infos)})
ordered_files = _get_file_infos(["file_2-ss=3.0.py",
"file_3-ss=2.0.py"])
- assert manager._file_queue == deque(ordered_files)
+ assert manager._file_queue == OrderedDict.fromkeys(ordered_files)
@conf_vars({("dag_processor", "file_parsing_sort_mode"): "modified_time"})
@mock.patch("airflow.utils.file.os.path.getmtime", new=mock_get_mtime)
@@ -597,7 +597,7 @@ class TestDagFileProcessorManager:
"file_4-ss=1.0.py",
]
)
- assert manager._file_queue == deque(ordered_files)
+ assert manager._file_queue == OrderedDict.fromkeys(ordered_files)
def test_add_new_files_to_queue_behavior(self):
"""
@@ -615,7 +615,7 @@ class TestDagFileProcessorManager:
# Setup:
# file_1 is already in the queue
- manager._file_queue = deque([file_1])
+ manager._file_queue = OrderedDict.fromkeys([file_1])
# file_3 is currently being processed
manager._processors[file_3] = MagicMock()
@@ -641,7 +641,7 @@ class TestDagFileProcessorManager:
parsed_versioned_file = _get_versioned_file_info("file_4.py")
new_file = _get_file_infos(["file_2.py"])[0]
- manager._file_queue = deque([queued_versioned_file])
+ manager._file_queue = OrderedDict.fromkeys([queued_versioned_file])
manager._processors[processed_versioned_file] = MagicMock()
manager._file_stats[parsed_versioned_file] = DagFileStat(num_dags=1)
@@ -679,7 +679,7 @@ class TestDagFileProcessorManager:
# Populate queue with unsorted files
# Queue: [file_1 (100), file_2 (200)]
- manager._file_queue = deque([dag_files[0], dag_files[1]])
+ manager._file_queue = OrderedDict.fromkeys([dag_files[0],
dag_files[1]])
manager._resort_file_queue()
@@ -697,7 +697,7 @@ class TestDagFileProcessorManager:
manager = DagFileProcessorManager(max_runs=1)
# Populate queue in non-alphabetical order
- manager._file_queue = deque([file_b, file_a])
+ manager._file_queue = OrderedDict.fromkeys([file_b, file_a])
manager._resort_file_queue()
@@ -727,7 +727,7 @@ class TestDagFileProcessorManager:
manager = DagFileProcessorManager(max_runs=1)
# Queue order: callback_1, callback_2, regular_1, regular_2
- manager._file_queue = deque([dag_files[0], dag_files[1], dag_files[2],
dag_files[3]])
+ manager._file_queue = OrderedDict.fromkeys([dag_files[0],
dag_files[1], dag_files[2], dag_files[3]])
# Both callback files have pending callbacks
manager._callback_to_execute[dag_files[0]] = [MagicMock()]
@@ -762,22 +762,22 @@ class TestDagFileProcessorManager:
dag_file: DagFileStat(1, 0, last_finish_time, 1.0, 1, 1),
}
with time_machine.travel(freezed_base_time):
- assert manager._file_queue == deque()
+ assert manager._file_queue == OrderedDict()
# File Path Queue will be empty as the "modified time" < "last
finish time"
manager.prepare_file_queue(known_files=known_files)
- assert manager._file_queue == deque()
+ assert manager._file_queue == OrderedDict()
# Simulate the DAG modification by using modified_time which is greater
# than the last_parse_time but still less than now -
min_file_process_interval
file_1_new_mtime = freezed_base_time - timedelta(seconds=5)
file_1_new_mtime_ts = file_1_new_mtime.timestamp()
with time_machine.travel(freezed_base_time):
- assert manager._file_queue == deque()
+ assert manager._file_queue == OrderedDict()
# File Path Queue will be empty as the "modified time" < "last
finish time"
mock_getmtime.side_effect = [file_1_new_mtime_ts]
manager.prepare_file_queue(known_files=known_files)
# Check that file is added to the queue even though file was just
recently passed
- assert manager._file_queue == deque([dag_file])
+ assert manager._file_queue == OrderedDict.fromkeys([dag_file])
assert last_finish_time < file_1_new_mtime
assert (
manager._file_process_interval
@@ -794,7 +794,7 @@ class TestDagFileProcessorManager:
manager.prepare_file_queue(known_files={"testing": {known_file}})
- assert manager._file_queue == deque()
+ assert manager._file_queue == OrderedDict()
@conf_vars({("dag_processor", "file_parsing_sort_mode"): "alphabetical"})
def
test_prepare_file_queue_skips_file_when_versioned_stat_is_at_run_limit(self):
@@ -806,7 +806,7 @@ class TestDagFileProcessorManager:
manager.prepare_file_queue(known_files={"testing": {known_file}})
- assert manager._file_queue == deque()
+ assert manager._file_queue == OrderedDict()
@conf_vars({("dag_processor", "file_parsing_sort_mode"): "alphabetical"})
def
test_prepare_file_queue_skips_recently_processed_file_with_versioned_stats(self):
@@ -821,7 +821,7 @@ class TestDagFileProcessorManager:
manager.prepare_file_queue(known_files={"testing": {known_file}})
- assert manager._file_queue == deque()
+ assert manager._file_queue == OrderedDict()
@conf_vars({("dag_processor", "file_parsing_sort_mode"): "modified_time"})
@mock.patch("airflow.utils.file.os.path.getmtime")
@@ -843,7 +843,7 @@ class TestDagFileProcessorManager:
mock_getmtime.side_effect = [(freezed_base_time -
timedelta(seconds=5)).timestamp()]
manager.prepare_file_queue(known_files=known_files)
- assert manager._file_queue == deque([known_file])
+ assert manager._file_queue == OrderedDict.fromkeys([known_file])
assert known_file not in manager._file_stats
assert versioned_file in manager._file_stats
@@ -864,9 +864,9 @@ class TestDagFileProcessorManager:
manager = DagFileProcessorManager(max_runs=1)
manager._dag_bundles = list(DagBundlesManager().get_all_dag_bundles())
- manager._file_queue = deque([file2, file1])
+ manager._file_queue = OrderedDict.fromkeys([file2, file1])
manager._queue_requested_files_for_parsing()
- assert manager._file_queue == deque([file1, file2])
+ assert manager._file_queue == OrderedDict.fromkeys([file1, file2])
assert manager._force_refresh_bundles == {"dags-folder"}
with create_session() as session2:
parsing_request_after = session2.get(DagPriorityParsingRequest,
parsing_request.id)
@@ -888,7 +888,7 @@ class TestDagFileProcessorManager:
manager = DagFileProcessorManager(max_runs=1)
manager._dag_bundles = list(DagBundlesManager().get_all_dag_bundles())
manager._queue_requested_files_for_parsing()
- assert manager._file_queue == deque([file1])
+ assert manager._file_queue == OrderedDict.fromkeys([file1])
with create_session() as session2:
parsing_request_after =
session2.scalars(select(DagPriorityParsingRequest)).all()
assert len(parsing_request_after) == 1
@@ -907,11 +907,11 @@ class TestDagFileProcessorManager:
return [file1]
manager = ApiBackedManager(max_runs=1)
- manager._file_queue = deque([file2])
+ manager._file_queue = OrderedDict.fromkeys([file2])
manager._queue_requested_files_for_parsing()
- assert manager._file_queue == deque([file1, file2])
+ assert manager._file_queue == OrderedDict.fromkeys([file1, file2])
assert manager._force_refresh_bundles == {"dags-folder"}
def test_request_bundle_refresh_marks_bundles_for_refresh(self):
@@ -1758,7 +1758,7 @@ class TestDagFileProcessorManager:
manager._add_callback_to_queue(dag2_req1)
# then - requests should be in manager's queue, with dag2 ahead of
dag1 (because it was added last)
- assert manager._file_queue == deque([dag2_path, dag1_path])
+ assert manager._file_queue == OrderedDict.fromkeys([dag2_path,
dag1_path])
assert set(manager._callback_to_execute.keys()) == {
dag1_path,
dag2_path,
@@ -1766,12 +1766,12 @@ class TestDagFileProcessorManager:
assert manager._callback_to_execute[dag2_path] == [dag2_req1]
# update the queue, although the callback is registered
- assert manager._file_queue == deque([dag2_path, dag1_path])
+ assert manager._file_queue == OrderedDict.fromkeys([dag2_path,
dag1_path])
# when
manager._add_callback_to_queue(dag1_req2)
# Since dag1_req2 is same as dag1_req1, we now have 2 items in
file_path_queue
- assert manager._file_queue == deque([dag2_path, dag1_path])
+ assert manager._file_queue == OrderedDict.fromkeys([dag2_path,
dag1_path])
assert manager._callback_to_execute[dag1_path] == [
dag1_req1,
dag1_req2,