This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 9ad468cb955 Restore `deactivate_deleted_dags` signature broken by 
#63617 (#64245)
9ad468cb955 is described below

commit 9ad468cb955ee24d8ba9f3a5237b4219f1aa0383
Author: Ephraim Anierobi <[email protected]>
AuthorDate: Thu Mar 26 15:35:49 2026 +0100

    Restore `deactivate_deleted_dags` signature broken by #63617 (#64245)
    
    The zip import error fix (#63617) changed the public signature of
    `DagFileProcessorManager.deactivate_deleted_dags` from
    `(bundle_name, present: set[DagFileInfo])` to
    `(bundle_name, observed_filelocs: set[str])`, breaking subclass
    overrides. Restore the original signature and compute observed
    filelocs internally.
    
    Also widen `DagModel.deactivate_deleted_dags` `rel_filelocs` type
    from `set[str]` to `Collection[str]` to accept both list and set
    callers.
---
 airflow-core/src/airflow/dag_processing/manager.py |  8 ++---
 airflow-core/src/airflow/models/dag.py             |  3 +-
 .../tests/unit/dag_processing/test_manager.py      | 35 ++++++++++++++++++++--
 airflow-core/tests/unit/models/test_dag.py         |  2 +-
 4 files changed, 40 insertions(+), 8 deletions(-)

diff --git a/airflow-core/src/airflow/dag_processing/manager.py 
b/airflow-core/src/airflow/dag_processing/manager.py
index d1194127599..76f641da0f3 100644
--- a/airflow-core/src/airflow/dag_processing/manager.py
+++ b/airflow-core/src/airflow/dag_processing/manager.py
@@ -739,11 +739,10 @@ class DagFileProcessorManager(LoggingMixin):
 
             known_files[bundle.name] = found_files
 
-            observed_filelocs = self._get_observed_filelocs(found_files)
-            self.deactivate_deleted_dags(bundle_name=bundle.name, 
observed_filelocs=observed_filelocs)
+            self.deactivate_deleted_dags(bundle_name=bundle.name, 
present=found_files)
             self.clear_orphaned_import_errors(
                 bundle_name=bundle.name,
-                observed_filelocs=observed_filelocs,
+                observed_filelocs=self._get_observed_filelocs(found_files),
             )
 
         if any_refreshed:
@@ -793,8 +792,9 @@ class DagFileProcessorManager(LoggingMixin):
 
         return observed_filelocs
 
-    def deactivate_deleted_dags(self, bundle_name: str, observed_filelocs: 
set[str]) -> None:
+    def deactivate_deleted_dags(self, bundle_name: str, present: 
set[DagFileInfo]) -> None:
         """Deactivate DAGs that come from files that are no longer present in 
bundle."""
+        observed_filelocs = self._get_observed_filelocs(present)
         with create_session() as session:
             any_deactivated = DagModel.deactivate_deleted_dags(
                 bundle_name=bundle_name,
diff --git a/airflow-core/src/airflow/models/dag.py 
b/airflow-core/src/airflow/models/dag.py
index d4a74b6cc24..c98f15af910 100644
--- a/airflow-core/src/airflow/models/dag.py
+++ b/airflow-core/src/airflow/models/dag.py
@@ -588,7 +588,7 @@ class DagModel(Base):
     def deactivate_deleted_dags(
         cls,
         bundle_name: str,
-        rel_filelocs: set[str],
+        rel_filelocs: Collection[str],
         session: Session = NEW_SESSION,
     ) -> bool:
         """
@@ -600,6 +600,7 @@ class DagModel(Base):
         :return: True if any DAGs were marked as stale, False otherwise
         """
         log.debug("Deactivating DAGs (for which DAG files are deleted) from %s 
table ", cls.__tablename__)
+        rel_filelocs = set(rel_filelocs)
         dag_models = session.scalars(
             select(cls)
             .where(
diff --git a/airflow-core/tests/unit/dag_processing/test_manager.py 
b/airflow-core/tests/unit/dag_processing/test_manager.py
index ea867ad1fec..46f23e6e3b5 100644
--- a/airflow-core/tests/unit/dag_processing/test_manager.py
+++ b/airflow-core/tests/unit/dag_processing/test_manager.py
@@ -309,6 +309,37 @@ class TestDagFileProcessorManager:
         assert len(import_errors) == 1
         assert import_errors[0].filename == "test_zip.zip/broken_dag.py"
 
+    def test_refresh_dag_bundles_calls_legacy_deactivate_deleted_dags_override(
+        self, tmp_path, configure_dag_bundles
+    ):
+        bundle_path = tmp_path / "bundleone"
+        bundle_path.mkdir()
+        dag_path = bundle_path / "test_dag.py"
+        dag_path.write_text("from airflow.sdk import DAG\n")
+
+        class BackwardCompatibleManager(DagFileProcessorManager):
+            seen_bundle_name: str | None = None
+            seen_present: set[DagFileInfo] | None = None
+
+            def deactivate_deleted_dags(self, bundle_name: str, present: 
set[DagFileInfo]) -> None:
+                self.seen_bundle_name = bundle_name
+                self.seen_present = present
+
+        with configure_dag_bundles({"bundleone": bundle_path}):
+            DagBundlesManager().sync_bundles_to_db()
+            manager = BackwardCompatibleManager(max_runs=1)
+            manager._dag_bundles = 
list(DagBundlesManager().get_all_dag_bundles())
+            manager._refresh_dag_bundles({})
+
+        assert manager.seen_bundle_name == "bundleone"
+        assert manager.seen_present == {
+            DagFileInfo(
+                bundle_name="bundleone",
+                rel_path=Path("test_dag.py"),
+                bundle_path=bundle_path,
+            )
+        }
+
     @conf_vars({("core", "load_examples"): "False"})
     def test_max_runs_when_no_files(self, tmp_path):
         with conf_vars({("core", "dags_folder"): str(tmp_path)}):
@@ -999,7 +1030,7 @@ class TestDagFileProcessorManager:
         ]
 
         manager = DagFileProcessorManager(max_runs=1)
-        manager.deactivate_deleted_dags("dag_maker", 
manager._get_observed_filelocs(set(active_files)))
+        manager.deactivate_deleted_dags("dag_maker", set(active_files))
 
         # The DAG from test_dag1.py is still active
         assert session.get(DagModel, "test_dag1").is_stale is False
@@ -1090,7 +1121,7 @@ class TestDagFileProcessorManager:
         dag_maker.sync_dagbag_to_db()
 
         manager = DagFileProcessorManager(max_runs=1)
-        manager.deactivate_deleted_dags("dag_maker", 
manager._get_observed_filelocs(set(active_files)))
+        manager.deactivate_deleted_dags("dag_maker", set(active_files))
 
         if should_call_cleanup:
             mock_remove_references.assert_called_once()
diff --git a/airflow-core/tests/unit/models/test_dag.py 
b/airflow-core/tests/unit/models/test_dag.py
index 11a1c4818fc..046c85ea799 100644
--- a/airflow-core/tests/unit/models/test_dag.py
+++ b/airflow-core/tests/unit/models/test_dag.py
@@ -842,7 +842,7 @@ class TestDag:
 
         DagModel.deactivate_deleted_dags(
             bundle_name=orm_dag.bundle_name,
-            rel_filelocs=set(list_py_file_paths(settings.DAGS_FOLDER)),
+            rel_filelocs=list_py_file_paths(settings.DAGS_FOLDER),
         )
 
         orm_dag = session.scalar(select(DagModel).where(DagModel.dag_id == 
dag_id))

Reply via email to