This is an automated email from the ASF dual-hosted git repository.
ephraimbuddy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 01be07a4571 Mark Dags stale when their bundle is removed from config
(#66948)
01be07a4571 is described below
commit 01be07a4571e58c311048246788f76f837b145fb
Author: Ephraim Anierobi <[email protected]>
AuthorDate: Fri May 15 11:11:40 2026 +0100
Mark Dags stale when their bundle is removed from config (#66948)
* Mark Dags stale when their bundle is removed from config
When a Dag bundle is removed from the bundle config, sync_bundles_to_db
flipped the bundle's active flag to False but left its Dags with
is_stale=False. The processor stops parsing files for inactive bundles,
so the time-based check in deactivate_stale_dags never fired for them.
deactivate_stale_dags now reads the set of active bundles from the
DagBundleModel table and treats any non-stale Dag whose bundle is not
active as stale, in addition to the existing last_parsed_time check for
Dags in active bundles. If the bundle reappears in config later, the
existing parse path resets is_stale to False per Dag.
* Apply suggestions from code review
Co-authored-by: Wei Lee <[email protected]>
---------
Co-authored-by: Wei Lee <[email protected]>
---
airflow-core/src/airflow/dag_processing/manager.py | 16 +++++++--
.../tests/unit/dag_processing/test_manager.py | 40 ++++++++++++++++++++++
2 files changed, 54 insertions(+), 2 deletions(-)
diff --git a/airflow-core/src/airflow/dag_processing/manager.py
b/airflow-core/src/airflow/dag_processing/manager.py
index 3d2767785ab..72e51e48743 100644
--- a/airflow-core/src/airflow/dag_processing/manager.py
+++ b/airflow-core/src/airflow/dag_processing/manager.py
@@ -411,17 +411,29 @@ class DagFileProcessorManager(LoggingMixin):
):
"""Detect and deactivate DAGs which are no longer present in files."""
to_deactivate = set()
- bundle_names = {b.name for b in self._dag_bundles}
+ inactive_bundles = set(
+
session.scalars(select(DagBundleModel.name).where(DagBundleModel.active.is_(False))).all()
+ )
query = select(
DagModel.dag_id,
DagModel.bundle_name,
DagModel.fileloc,
DagModel.last_parsed_time,
DagModel.relative_fileloc,
- ).where(~DagModel.is_stale, DagModel.bundle_name.in_(bundle_names))
+ ).where(~DagModel.is_stale)
dags_parsed = session.execute(query)
for dag in dags_parsed:
+ # Dags whose bundle has been removed from config (bundle no longer
active) are stale —
+ # the processor has stopped parsing their files, so the time-based
check below would never fire.
+ if dag.bundle_name in inactive_bundles:
+ self.log.info(
+ "Deactivating Dag %s. Its bundle %s is no longer active.",
+ dag.dag_id,
+ dag.bundle_name,
+ )
+ to_deactivate.add(dag.dag_id)
+ continue
# When the Dag's last_parsed_time is more than the
stale_dag_threshold older than the
# Dag file's last_finish_time, the Dag is considered stale as has
apparently been removed from the file,
# This is especially relevant for Dag files that generate Dags in
a dynamic manner.
diff --git a/airflow-core/tests/unit/dag_processing/test_manager.py
b/airflow-core/tests/unit/dag_processing/test_manager.py
index 3c36ee066da..40b819f146e 100644
--- a/airflow-core/tests/unit/dag_processing/test_manager.py
+++ b/airflow-core/tests/unit/dag_processing/test_manager.py
@@ -1000,6 +1000,46 @@ class TestDagFileProcessorManager:
# SerializedDagModel gives history about Dags
assert serialized_dag_count == 1
+ @pytest.mark.usefixtures("testing_dag_bundle")
+ def test_deactivate_stale_dags_marks_dags_in_inactive_bundles(self,
session):
+ """Dags whose bundle is no longer active should be marked stale even
without a parse signal."""
+ session.add(DagBundleModel(name="gone-bundle"))
+ session.flush()
+ session.execute(
+ DagBundleModel.__table__.update().where(DagBundleModel.name ==
"gone-bundle").values(active=False)
+ )
+ session.add(
+ DagModel(
+ dag_id="dag_in_inactive_bundle",
+ bundle_name="gone-bundle",
+ relative_fileloc="some_file.py",
+ last_parsed_time=timezone.utcnow(),
+ is_stale=False,
+ )
+ )
+ session.add(
+ DagModel(
+ dag_id="dag_in_active_bundle",
+ bundle_name="testing",
+ relative_fileloc="other_file.py",
+ last_parsed_time=timezone.utcnow(),
+ is_stale=False,
+ )
+ )
+ session.flush()
+
+ manager = DagFileProcessorManager(max_runs=1, processor_timeout=10 *
60)
+ manager.deactivate_stale_dags(last_parsed={})
+
+ is_stale_by_dag = dict(
+ session.execute(
+ select(DagModel.dag_id, DagModel.is_stale).where(
+ DagModel.dag_id.in_(["dag_in_inactive_bundle",
"dag_in_active_bundle"])
+ )
+ ).all()
+ )
+ assert is_stale_by_dag == {"dag_in_inactive_bundle": True,
"dag_in_active_bundle": False}
+
@mock.patch("airflow.dag_processing.manager.BundleUsageTrackingManager")
def test_cleanup_stale_bundle_versions_interval(self, mock_bundle_manager):
manager = DagFileProcessorManager(max_runs=1)