This is an automated email from the ASF dual-hosted git repository.
ephraimanierobi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 9ad468cb955 Restore `deactivate_deleted_dags` signature broken by
#63617 (#64245)
9ad468cb955 is described below
commit 9ad468cb955ee24d8ba9f3a5237b4219f1aa0383
Author: Ephraim Anierobi <[email protected]>
AuthorDate: Thu Mar 26 15:35:49 2026 +0100
Restore `deactivate_deleted_dags` signature broken by #63617 (#64245)
The zip import error fix (#63617) changed the public signature of
`DagFileProcessorManager.deactivate_deleted_dags` from
`(bundle_name, present: set[DagFileInfo])` to
`(bundle_name, observed_filelocs: set[str])`, breaking subclass
overrides. Restore the original signature and compute observed
filelocs internally.
Also widen `DagModel.deactivate_deleted_dags` `rel_filelocs` type
from `set[str]` to `Collection[str]` to accept both list and set
callers.
---
airflow-core/src/airflow/dag_processing/manager.py | 8 ++---
airflow-core/src/airflow/models/dag.py | 3 +-
.../tests/unit/dag_processing/test_manager.py | 35 ++++++++++++++++++++--
airflow-core/tests/unit/models/test_dag.py | 2 +-
4 files changed, 40 insertions(+), 8 deletions(-)
diff --git a/airflow-core/src/airflow/dag_processing/manager.py
b/airflow-core/src/airflow/dag_processing/manager.py
index d1194127599..76f641da0f3 100644
--- a/airflow-core/src/airflow/dag_processing/manager.py
+++ b/airflow-core/src/airflow/dag_processing/manager.py
@@ -739,11 +739,10 @@ class DagFileProcessorManager(LoggingMixin):
known_files[bundle.name] = found_files
- observed_filelocs = self._get_observed_filelocs(found_files)
- self.deactivate_deleted_dags(bundle_name=bundle.name,
observed_filelocs=observed_filelocs)
+ self.deactivate_deleted_dags(bundle_name=bundle.name,
present=found_files)
self.clear_orphaned_import_errors(
bundle_name=bundle.name,
- observed_filelocs=observed_filelocs,
+ observed_filelocs=self._get_observed_filelocs(found_files),
)
if any_refreshed:
@@ -793,8 +792,9 @@ class DagFileProcessorManager(LoggingMixin):
return observed_filelocs
- def deactivate_deleted_dags(self, bundle_name: str, observed_filelocs:
set[str]) -> None:
+ def deactivate_deleted_dags(self, bundle_name: str, present:
set[DagFileInfo]) -> None:
"""Deactivate DAGs that come from files that are no longer present in
bundle."""
+ observed_filelocs = self._get_observed_filelocs(present)
with create_session() as session:
any_deactivated = DagModel.deactivate_deleted_dags(
bundle_name=bundle_name,
diff --git a/airflow-core/src/airflow/models/dag.py
b/airflow-core/src/airflow/models/dag.py
index d4a74b6cc24..c98f15af910 100644
--- a/airflow-core/src/airflow/models/dag.py
+++ b/airflow-core/src/airflow/models/dag.py
@@ -588,7 +588,7 @@ class DagModel(Base):
def deactivate_deleted_dags(
cls,
bundle_name: str,
- rel_filelocs: set[str],
+ rel_filelocs: Collection[str],
session: Session = NEW_SESSION,
) -> bool:
"""
@@ -600,6 +600,7 @@ class DagModel(Base):
:return: True if any DAGs were marked as stale, False otherwise
"""
log.debug("Deactivating DAGs (for which DAG files are deleted) from %s
table ", cls.__tablename__)
+ rel_filelocs = set(rel_filelocs)
dag_models = session.scalars(
select(cls)
.where(
diff --git a/airflow-core/tests/unit/dag_processing/test_manager.py
b/airflow-core/tests/unit/dag_processing/test_manager.py
index ea867ad1fec..46f23e6e3b5 100644
--- a/airflow-core/tests/unit/dag_processing/test_manager.py
+++ b/airflow-core/tests/unit/dag_processing/test_manager.py
@@ -309,6 +309,37 @@ class TestDagFileProcessorManager:
assert len(import_errors) == 1
assert import_errors[0].filename == "test_zip.zip/broken_dag.py"
+ def test_refresh_dag_bundles_calls_legacy_deactivate_deleted_dags_override(
+ self, tmp_path, configure_dag_bundles
+ ):
+ bundle_path = tmp_path / "bundleone"
+ bundle_path.mkdir()
+ dag_path = bundle_path / "test_dag.py"
+ dag_path.write_text("from airflow.sdk import DAG\n")
+
+ class BackwardCompatibleManager(DagFileProcessorManager):
+ seen_bundle_name: str | None = None
+ seen_present: set[DagFileInfo] | None = None
+
+ def deactivate_deleted_dags(self, bundle_name: str, present:
set[DagFileInfo]) -> None:
+ self.seen_bundle_name = bundle_name
+ self.seen_present = present
+
+ with configure_dag_bundles({"bundleone": bundle_path}):
+ DagBundlesManager().sync_bundles_to_db()
+ manager = BackwardCompatibleManager(max_runs=1)
+ manager._dag_bundles =
list(DagBundlesManager().get_all_dag_bundles())
+ manager._refresh_dag_bundles({})
+
+ assert manager.seen_bundle_name == "bundleone"
+ assert manager.seen_present == {
+ DagFileInfo(
+ bundle_name="bundleone",
+ rel_path=Path("test_dag.py"),
+ bundle_path=bundle_path,
+ )
+ }
+
@conf_vars({("core", "load_examples"): "False"})
def test_max_runs_when_no_files(self, tmp_path):
with conf_vars({("core", "dags_folder"): str(tmp_path)}):
@@ -999,7 +1030,7 @@ class TestDagFileProcessorManager:
]
manager = DagFileProcessorManager(max_runs=1)
- manager.deactivate_deleted_dags("dag_maker",
manager._get_observed_filelocs(set(active_files)))
+ manager.deactivate_deleted_dags("dag_maker", set(active_files))
# The DAG from test_dag1.py is still active
assert session.get(DagModel, "test_dag1").is_stale is False
@@ -1090,7 +1121,7 @@ class TestDagFileProcessorManager:
dag_maker.sync_dagbag_to_db()
manager = DagFileProcessorManager(max_runs=1)
- manager.deactivate_deleted_dags("dag_maker",
manager._get_observed_filelocs(set(active_files)))
+ manager.deactivate_deleted_dags("dag_maker", set(active_files))
if should_call_cleanup:
mock_remove_references.assert_called_once()
diff --git a/airflow-core/tests/unit/models/test_dag.py
b/airflow-core/tests/unit/models/test_dag.py
index 11a1c4818fc..046c85ea799 100644
--- a/airflow-core/tests/unit/models/test_dag.py
+++ b/airflow-core/tests/unit/models/test_dag.py
@@ -842,7 +842,7 @@ class TestDag:
DagModel.deactivate_deleted_dags(
bundle_name=orm_dag.bundle_name,
- rel_filelocs=set(list_py_file_paths(settings.DAGS_FOLDER)),
+ rel_filelocs=list_py_file_paths(settings.DAGS_FOLDER),
)
orm_dag = session.scalar(select(DagModel).where(DagModel.dag_id ==
dag_id))