This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new ae062b9a75a Dag processor manager consumes DagPriorityParsingRequests
(#48424)
ae062b9a75a is described below
commit ae062b9a75a803cc51fcba070cf0140ef1c106ac
Author: Jed Cunningham <[email protected]>
AuthorDate: Thu Mar 27 02:04:31 2025 -0600
Dag processor manager consumes DagPriorityParsingRequests (#48424)
The manager will now consume DagPriorityParsingRequests, allowing the
"reparse" function from the UI/API to function. This also forces a
refresh of the bundle the DAG is in.
---
airflow-core/src/airflow/dag_processing/manager.py | 63 +++++++++++-------
.../tests/unit/dag_processing/test_manager.py | 76 +++++++++++++++++++---
2 files changed, 104 insertions(+), 35 deletions(-)
diff --git a/airflow-core/src/airflow/dag_processing/manager.py
b/airflow-core/src/airflow/dag_processing/manager.py
index 70829cd20ed..1dee1ce61dd 100644
--- a/airflow-core/src/airflow/dag_processing/manager.py
+++ b/airflow-core/src/airflow/dag_processing/manager.py
@@ -207,6 +207,8 @@ class DagFileProcessorManager(LoggingMixin):
)
_bundles_last_refreshed: float = attrs.field(default=0, init=False)
"""Last time we checked if any bundles are ready to be refreshed"""
+ _force_refresh_bundles: set[str] = attrs.field(factory=set, init=False)
+ """List of bundles that need to be force refreshed in the next loop"""
def register_exit_signals(self):
"""Register signals that stop child processes."""
@@ -321,6 +323,8 @@ class DagFileProcessorManager(LoggingMixin):
self._kill_timed_out_processors()
+ self._queue_requested_files_for_parsing()
+
self._refresh_dag_bundles(known_files=known_files)
if not self._file_queue:
@@ -333,8 +337,6 @@ class DagFileProcessorManager(LoggingMixin):
# if new files found in dag dir, add them
self.add_files_to_queue(known_files=known_files)
- self._refresh_requested_filelocs()
-
self._start_new_processes()
self._service_processor_sockets(timeout=poll_time)
@@ -381,20 +383,40 @@ class DagFileProcessorManager(LoggingMixin):
self.selector.unregister(key.fileobj)
key.fileobj.close() # type: ignore[union-attr]
- def _refresh_requested_filelocs(self) -> None:
- """Refresh filepaths from dag dir as requested by users via APIs."""
- return
- # TODO: AIP-66 make bundle aware - fileloc will be relative
(eventually), thus not unique in order to know what file to repase
- # Get values from DB table
- filelocs = self._get_priority_filelocs()
- for fileloc in filelocs:
- # Try removing the fileloc if already present
+ def _queue_requested_files_for_parsing(self) -> None:
+ """Queue any files requested for parsing as requested by users via
UI/API."""
+ files = self._get_priority_files()
+ bundles_to_refresh: set[str] = set()
+ for file in files:
+ # Try removing the file if already present
try:
- self._file_queue.remove(fileloc)
+ self._file_queue.remove(file)
except ValueError:
pass
- # enqueue fileloc to the start of the queue.
- self._file_queue.appendleft(fileloc)
+ # enqueue file to the start of the queue.
+ self._file_queue.appendleft(file)
+ bundles_to_refresh.add(file.bundle_name)
+
+ self._force_refresh_bundles |= bundles_to_refresh
+ if self._force_refresh_bundles:
+ self.log.info("Bundles being force refreshed: %s", ",
".join(self._force_refresh_bundles))
+
+ @provide_session
+ def _get_priority_files(self, session: Session = NEW_SESSION) ->
list[DagFileInfo]:
+ files: list[DagFileInfo] = []
+ bundles = {b.name: b for b in self._dag_bundles}
+ requests = session.scalars(
+
select(DagPriorityParsingRequest).where(DagPriorityParsingRequest.bundle_name.in_(bundles.keys()))
+ )
+ for request in requests:
+ bundle = bundles[request.bundle_name]
+ files.append(
+ DagFileInfo(
+ rel_path=Path(request.relative_fileloc),
bundle_name=bundle.name, bundle_path=bundle.path
+ )
+ )
+ session.delete(request)
+ return files
@provide_session
@retry_db_transaction
@@ -439,17 +461,6 @@ class DagFileProcessorManager(LoggingMixin):
self._add_files_to_queue([file_info], True)
Stats.incr("dag_processing.other_callback_count")
- @classmethod
- @provide_session
- def _get_priority_filelocs(cls, session: Session = NEW_SESSION):
- """Get filelocs from DB table."""
- filelocs: list[str] = []
- requests = session.scalars(select(DagPriorityParsingRequest))
- for request in requests:
- filelocs.append(request.fileloc)
- session.delete(request)
- return filelocs
-
def _refresh_dag_bundles(self, known_files: dict[str, set[DagFileInfo]]):
"""Refresh DAG bundles, if required."""
now = timezone.utcnow()
@@ -457,7 +468,7 @@ class DagFileProcessorManager(LoggingMixin):
# we don't need to check if it's time to refresh every loop - that is
way too often
next_check = self._bundles_last_refreshed +
self.bundle_refresh_check_interval
now_seconds = time.monotonic()
- if now_seconds < next_check:
+ if now_seconds < next_check and not self._force_refresh_bundles:
self.log.debug(
"Not time to check if DAG Bundles need refreshed yet -
skipping. Next check in %.2f seconds",
next_check - now_seconds,
@@ -497,6 +508,7 @@ class DagFileProcessorManager(LoggingMixin):
elapsed_time_since_refresh < bundle.refresh_interval
and current_version_matches_db
and previously_seen
+ and bundle.name not in self._force_refresh_bundles
):
self.log.info("Not time to refresh bundle %s", bundle.name)
continue
@@ -510,6 +522,7 @@ class DagFileProcessorManager(LoggingMixin):
continue
bundle_model.last_refreshed = now
+ self._force_refresh_bundles.discard(bundle.name)
if bundle.supports_versioning:
# We can short-circuit the rest of this if (1) bundle was
seen before by
diff --git a/airflow-core/tests/unit/dag_processing/test_manager.py
b/airflow-core/tests/unit/dag_processing/test_manager.py
index 027d58e6fa1..387b45b619f 100644
--- a/airflow-core/tests/unit/dag_processing/test_manager.py
+++ b/airflow-core/tests/unit/dag_processing/test_manager.py
@@ -390,28 +390,53 @@ class TestDagFileProcessorManager:
> (freezed_base_time -
manager._file_stats[dag_file].last_finish_time).total_seconds()
)
- @pytest.mark.skip("AIP-66: parsing requests are not bundle aware yet")
def test_file_paths_in_queue_sorted_by_priority(self):
from airflow.models.dagbag import DagPriorityParsingRequest
- parsing_request = DagPriorityParsingRequest(fileloc="file_1.py")
+ parsing_request =
DagPriorityParsingRequest(relative_fileloc="file_1.py",
bundle_name="dags-folder")
with create_session() as session:
session.add(parsing_request)
session.commit()
- """Test dag files are sorted by priority"""
- dag_files = ["file_3.py", "file_2.py", "file_4.py", "file_1.py"]
-
- manager = DagFileProcessorManager(dag_directory="directory",
max_runs=1)
+ file1 = DagFileInfo(
+ bundle_name="dags-folder", rel_path=Path("file_1.py"),
bundle_path=TEST_DAGS_FOLDER
+ )
+ file2 = DagFileInfo(
+ bundle_name="dags-folder", rel_path=Path("file_2.py"),
bundle_path=TEST_DAGS_FOLDER
+ )
- manager.handle_removed_files(dag_files)
- manager._file_queue = deque(["file_2.py", "file_3.py", "file_4.py",
"file_1.py"])
- manager._refresh_requested_filelocs()
- assert manager._file_queue == deque(["file_1.py", "file_2.py",
"file_3.py", "file_4.py"])
+ manager = DagFileProcessorManager(max_runs=1)
+ manager._dag_bundles = list(DagBundlesManager().get_all_dag_bundles())
+ manager._file_queue = deque([file2, file1])
+ manager._queue_requested_files_for_parsing()
+ assert manager._file_queue == deque([file1, file2])
+ assert manager._force_refresh_bundles == {"dags-folder"}
with create_session() as session2:
parsing_request_after =
session2.query(DagPriorityParsingRequest).get(parsing_request.id)
assert parsing_request_after is None
+ def test_parsing_requests_only_bundles_being_parsed(self,
testing_dag_bundle):
+ """Ensure the manager only handles parsing requests for bundles being
parsed in this manager"""
+ from airflow.models.dagbag import DagPriorityParsingRequest
+
+ with create_session() as session:
+
session.add(DagPriorityParsingRequest(relative_fileloc="file_1.py",
bundle_name="dags-folder"))
+
session.add(DagPriorityParsingRequest(relative_fileloc="file_x.py",
bundle_name="testing"))
+ session.commit()
+
+ file1 = DagFileInfo(
+ bundle_name="dags-folder", rel_path=Path("file_1.py"),
bundle_path=TEST_DAGS_FOLDER
+ )
+
+ manager = DagFileProcessorManager(max_runs=1)
+ manager._dag_bundles = list(DagBundlesManager().get_all_dag_bundles())
+ manager._queue_requested_files_for_parsing()
+ assert manager._file_queue == deque([file1])
+ with create_session() as session2:
+ parsing_request_after =
session2.query(DagPriorityParsingRequest).all()
+ assert len(parsing_request_after) == 1
+ assert parsing_request_after[0].relative_fileloc == "file_x.py"
+
def test_scan_stale_dags(self, testing_dag_bundle):
"""
Ensure that DAGs are marked inactive when the file is parsed but the
@@ -991,6 +1016,37 @@ class TestDagFileProcessorManager:
manager._refresh_dag_bundles({})
assert bundleone.refresh.call_count == 1 # didn't fresh the
second time
+ def test_bundle_force_refresh(self):
+ """Ensure the dag processor honors force refreshing a bundle."""
+ config = [
+ {
+ "name": "bundleone",
+ "classpath":
"airflow.dag_processing.bundles.local.LocalDagBundle",
+ "kwargs": {"path": "/dev/null", "refresh_interval": 0},
+ },
+ ]
+
+ bundleone = MagicMock()
+ bundleone.name = "bundleone"
+ bundleone.path = "/dev/null"
+ bundleone.refresh_interval = 0
+ bundleone.get_current_version.return_value = None
+
+ with conf_vars(
+ {
+ ("dag_processor", "dag_bundle_config_list"):
json.dumps(config),
+ ("dag_processor", "bundle_refresh_check_interval"): "10",
+ }
+ ):
+ DagBundlesManager().sync_bundles_to_db()
+ manager = DagFileProcessorManager(max_runs=2)
+ manager._dag_bundles = [bundleone]
+ manager._refresh_dag_bundles({})
+ assert bundleone.refresh.call_count == 1
+ manager._force_refresh_bundles = {"bundleone"}
+ manager._refresh_dag_bundles({})
+ assert bundleone.refresh.call_count == 2 # forced refresh
+
def test_bundles_versions_are_stored(self, session):
config = [
{