This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new ae062b9a75a Dag processor manager consumes DagPriorityParsingRequests 
(#48424)
ae062b9a75a is described below

commit ae062b9a75a803cc51fcba070cf0140ef1c106ac
Author: Jed Cunningham <[email protected]>
AuthorDate: Thu Mar 27 02:04:31 2025 -0600

    Dag processor manager consumes DagPriorityParsingRequests (#48424)
    
    The manager will now consume DagPriorityParsingRequests, allowing the
    "reparse" function from the UI/API to function. This also forces a
    refresh of the bundle the DAG is in.
---
 airflow-core/src/airflow/dag_processing/manager.py | 63 +++++++++++-------
 .../tests/unit/dag_processing/test_manager.py      | 76 +++++++++++++++++++---
 2 files changed, 104 insertions(+), 35 deletions(-)

diff --git a/airflow-core/src/airflow/dag_processing/manager.py 
b/airflow-core/src/airflow/dag_processing/manager.py
index 70829cd20ed..1dee1ce61dd 100644
--- a/airflow-core/src/airflow/dag_processing/manager.py
+++ b/airflow-core/src/airflow/dag_processing/manager.py
@@ -207,6 +207,8 @@ class DagFileProcessorManager(LoggingMixin):
     )
     _bundles_last_refreshed: float = attrs.field(default=0, init=False)
     """Last time we checked if any bundles are ready to be refreshed"""
+    _force_refresh_bundles: set[str] = attrs.field(factory=set, init=False)
+    """List of bundles that need to be force refreshed in the next loop"""
 
     def register_exit_signals(self):
         """Register signals that stop child processes."""
@@ -321,6 +323,8 @@ class DagFileProcessorManager(LoggingMixin):
 
             self._kill_timed_out_processors()
 
+            self._queue_requested_files_for_parsing()
+
             self._refresh_dag_bundles(known_files=known_files)
 
             if not self._file_queue:
@@ -333,8 +337,6 @@ class DagFileProcessorManager(LoggingMixin):
                 # if new files found in dag dir, add them
                 self.add_files_to_queue(known_files=known_files)
 
-            self._refresh_requested_filelocs()
-
             self._start_new_processes()
 
             self._service_processor_sockets(timeout=poll_time)
@@ -381,20 +383,40 @@ class DagFileProcessorManager(LoggingMixin):
                 self.selector.unregister(key.fileobj)
                 key.fileobj.close()  # type: ignore[union-attr]
 
-    def _refresh_requested_filelocs(self) -> None:
-        """Refresh filepaths from dag dir as requested by users via APIs."""
-        return
-        # TODO: AIP-66 make bundle aware - fileloc will be relative 
(eventually), thus not unique in order to know what file to repase
-        # Get values from DB table
-        filelocs = self._get_priority_filelocs()
-        for fileloc in filelocs:
-            # Try removing the fileloc if already present
+    def _queue_requested_files_for_parsing(self) -> None:
+        """Queue any files requested for parsing as requested by users via 
UI/API."""
+        files = self._get_priority_files()
+        bundles_to_refresh: set[str] = set()
+        for file in files:
+            # Try removing the file if already present
             try:
-                self._file_queue.remove(fileloc)
+                self._file_queue.remove(file)
             except ValueError:
                 pass
-            # enqueue fileloc to the start of the queue.
-            self._file_queue.appendleft(fileloc)
+            # enqueue file to the start of the queue.
+            self._file_queue.appendleft(file)
+            bundles_to_refresh.add(file.bundle_name)
+
+        self._force_refresh_bundles |= bundles_to_refresh
+        if self._force_refresh_bundles:
+            self.log.info("Bundles being force refreshed: %s", ", 
".join(self._force_refresh_bundles))
+
+    @provide_session
+    def _get_priority_files(self, session: Session = NEW_SESSION) -> 
list[DagFileInfo]:
+        files: list[DagFileInfo] = []
+        bundles = {b.name: b for b in self._dag_bundles}
+        requests = session.scalars(
+            
select(DagPriorityParsingRequest).where(DagPriorityParsingRequest.bundle_name.in_(bundles.keys()))
+        )
+        for request in requests:
+            bundle = bundles[request.bundle_name]
+            files.append(
+                DagFileInfo(
+                    rel_path=Path(request.relative_fileloc), 
bundle_name=bundle.name, bundle_path=bundle.path
+                )
+            )
+            session.delete(request)
+        return files
 
     @provide_session
     @retry_db_transaction
@@ -439,17 +461,6 @@ class DagFileProcessorManager(LoggingMixin):
         self._add_files_to_queue([file_info], True)
         Stats.incr("dag_processing.other_callback_count")
 
-    @classmethod
-    @provide_session
-    def _get_priority_filelocs(cls, session: Session = NEW_SESSION):
-        """Get filelocs from DB table."""
-        filelocs: list[str] = []
-        requests = session.scalars(select(DagPriorityParsingRequest))
-        for request in requests:
-            filelocs.append(request.fileloc)
-            session.delete(request)
-        return filelocs
-
     def _refresh_dag_bundles(self, known_files: dict[str, set[DagFileInfo]]):
         """Refresh DAG bundles, if required."""
         now = timezone.utcnow()
@@ -457,7 +468,7 @@ class DagFileProcessorManager(LoggingMixin):
         # we don't need to check if it's time to refresh every loop - that is 
way too often
         next_check = self._bundles_last_refreshed + 
self.bundle_refresh_check_interval
         now_seconds = time.monotonic()
-        if now_seconds < next_check:
+        if now_seconds < next_check and not self._force_refresh_bundles:
             self.log.debug(
                 "Not time to check if DAG Bundles need refreshed yet - 
skipping. Next check in %.2f seconds",
                 next_check - now_seconds,
@@ -497,6 +508,7 @@ class DagFileProcessorManager(LoggingMixin):
                     elapsed_time_since_refresh < bundle.refresh_interval
                     and current_version_matches_db
                     and previously_seen
+                    and bundle.name not in self._force_refresh_bundles
                 ):
                     self.log.info("Not time to refresh bundle %s", bundle.name)
                     continue
@@ -510,6 +522,7 @@ class DagFileProcessorManager(LoggingMixin):
                     continue
 
                 bundle_model.last_refreshed = now
+                self._force_refresh_bundles.discard(bundle.name)
 
                 if bundle.supports_versioning:
                     # We can short-circuit the rest of this if (1) bundle was 
seen before by
diff --git a/airflow-core/tests/unit/dag_processing/test_manager.py 
b/airflow-core/tests/unit/dag_processing/test_manager.py
index 027d58e6fa1..387b45b619f 100644
--- a/airflow-core/tests/unit/dag_processing/test_manager.py
+++ b/airflow-core/tests/unit/dag_processing/test_manager.py
@@ -390,28 +390,53 @@ class TestDagFileProcessorManager:
                 > (freezed_base_time - 
manager._file_stats[dag_file].last_finish_time).total_seconds()
             )
 
-    @pytest.mark.skip("AIP-66: parsing requests are not bundle aware yet")
     def test_file_paths_in_queue_sorted_by_priority(self):
         from airflow.models.dagbag import DagPriorityParsingRequest
 
-        parsing_request = DagPriorityParsingRequest(fileloc="file_1.py")
+        parsing_request = 
DagPriorityParsingRequest(relative_fileloc="file_1.py", 
bundle_name="dags-folder")
         with create_session() as session:
             session.add(parsing_request)
             session.commit()
 
-        """Test dag files are sorted by priority"""
-        dag_files = ["file_3.py", "file_2.py", "file_4.py", "file_1.py"]
-
-        manager = DagFileProcessorManager(dag_directory="directory", 
max_runs=1)
+        file1 = DagFileInfo(
+            bundle_name="dags-folder", rel_path=Path("file_1.py"), 
bundle_path=TEST_DAGS_FOLDER
+        )
+        file2 = DagFileInfo(
+            bundle_name="dags-folder", rel_path=Path("file_2.py"), 
bundle_path=TEST_DAGS_FOLDER
+        )
 
-        manager.handle_removed_files(dag_files)
-        manager._file_queue = deque(["file_2.py", "file_3.py", "file_4.py", 
"file_1.py"])
-        manager._refresh_requested_filelocs()
-        assert manager._file_queue == deque(["file_1.py", "file_2.py", 
"file_3.py", "file_4.py"])
+        manager = DagFileProcessorManager(max_runs=1)
+        manager._dag_bundles = list(DagBundlesManager().get_all_dag_bundles())
+        manager._file_queue = deque([file2, file1])
+        manager._queue_requested_files_for_parsing()
+        assert manager._file_queue == deque([file1, file2])
+        assert manager._force_refresh_bundles == {"dags-folder"}
         with create_session() as session2:
             parsing_request_after = 
session2.query(DagPriorityParsingRequest).get(parsing_request.id)
         assert parsing_request_after is None
 
+    def test_parsing_requests_only_bundles_being_parsed(self, 
testing_dag_bundle):
+        """Ensure the manager only handles parsing requests for bundles being 
parsed in this manager"""
+        from airflow.models.dagbag import DagPriorityParsingRequest
+
+        with create_session() as session:
+            
session.add(DagPriorityParsingRequest(relative_fileloc="file_1.py", 
bundle_name="dags-folder"))
+            
session.add(DagPriorityParsingRequest(relative_fileloc="file_x.py", 
bundle_name="testing"))
+            session.commit()
+
+        file1 = DagFileInfo(
+            bundle_name="dags-folder", rel_path=Path("file_1.py"), 
bundle_path=TEST_DAGS_FOLDER
+        )
+
+        manager = DagFileProcessorManager(max_runs=1)
+        manager._dag_bundles = list(DagBundlesManager().get_all_dag_bundles())
+        manager._queue_requested_files_for_parsing()
+        assert manager._file_queue == deque([file1])
+        with create_session() as session2:
+            parsing_request_after = 
session2.query(DagPriorityParsingRequest).all()
+        assert len(parsing_request_after) == 1
+        assert parsing_request_after[0].relative_fileloc == "file_x.py"
+
     def test_scan_stale_dags(self, testing_dag_bundle):
         """
         Ensure that DAGs are marked inactive when the file is parsed but the
@@ -991,6 +1016,37 @@ class TestDagFileProcessorManager:
             manager._refresh_dag_bundles({})
             assert bundleone.refresh.call_count == 1  # didn't fresh the 
second time
 
+    def test_bundle_force_refresh(self):
+        """Ensure the dag processor honors force refreshing a bundle."""
+        config = [
+            {
+                "name": "bundleone",
+                "classpath": 
"airflow.dag_processing.bundles.local.LocalDagBundle",
+                "kwargs": {"path": "/dev/null", "refresh_interval": 0},
+            },
+        ]
+
+        bundleone = MagicMock()
+        bundleone.name = "bundleone"
+        bundleone.path = "/dev/null"
+        bundleone.refresh_interval = 0
+        bundleone.get_current_version.return_value = None
+
+        with conf_vars(
+            {
+                ("dag_processor", "dag_bundle_config_list"): 
json.dumps(config),
+                ("dag_processor", "bundle_refresh_check_interval"): "10",
+            }
+        ):
+            DagBundlesManager().sync_bundles_to_db()
+            manager = DagFileProcessorManager(max_runs=2)
+            manager._dag_bundles = [bundleone]
+            manager._refresh_dag_bundles({})
+            assert bundleone.refresh.call_count == 1
+            manager._force_refresh_bundles = {"bundleone"}
+            manager._refresh_dag_bundles({})
+            assert bundleone.refresh.call_count == 2  # forced refresh
+
     def test_bundles_versions_are_stored(self, session):
         config = [
             {

Reply via email to