This is an automated email from the ASF dual-hosted git repository.

ephraimbuddy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 01be07a4571 Mark Dags stale when their bundle is removed from config 
(#66948)
01be07a4571 is described below

commit 01be07a4571e58c311048246788f76f837b145fb
Author: Ephraim Anierobi <[email protected]>
AuthorDate: Fri May 15 11:11:40 2026 +0100

    Mark Dags stale when their bundle is removed from config (#66948)
    
    * Mark Dags stale when their bundle is removed from config
    
    When a Dag bundle is removed from the bundle config, sync_bundles_to_db
    flipped the bundle's active flag to False but left its Dags with
    is_stale=False. The processor stops parsing files for inactive bundles,
    so the time-based check in deactivate_stale_dags never fired for them.
    
    deactivate_stale_dags now reads the set of active bundles from the
    DagBundleModel table and treats any non-stale Dag whose bundle is not
    active as stale, in addition to the existing last_parsed_time check for
    Dags in active bundles. If the bundle reappears in config later, the
    existing parse path resets is_stale to False per Dag.
    
    * Apply suggestions from code review
    
    Co-authored-by: Wei Lee <[email protected]>
    
    ---------
    
    Co-authored-by: Wei Lee <[email protected]>
---
 airflow-core/src/airflow/dag_processing/manager.py | 16 +++++++--
 .../tests/unit/dag_processing/test_manager.py      | 40 ++++++++++++++++++++++
 2 files changed, 54 insertions(+), 2 deletions(-)

diff --git a/airflow-core/src/airflow/dag_processing/manager.py 
b/airflow-core/src/airflow/dag_processing/manager.py
index 3d2767785ab..72e51e48743 100644
--- a/airflow-core/src/airflow/dag_processing/manager.py
+++ b/airflow-core/src/airflow/dag_processing/manager.py
@@ -411,17 +411,29 @@ class DagFileProcessorManager(LoggingMixin):
     ):
         """Detect and deactivate DAGs which are no longer present in files."""
         to_deactivate = set()
-        bundle_names = {b.name for b in self._dag_bundles}
+        inactive_bundles = set(
+            
session.scalars(select(DagBundleModel.name).where(DagBundleModel.active.is_(False))).all()
+        )
         query = select(
             DagModel.dag_id,
             DagModel.bundle_name,
             DagModel.fileloc,
             DagModel.last_parsed_time,
             DagModel.relative_fileloc,
-        ).where(~DagModel.is_stale, DagModel.bundle_name.in_(bundle_names))
+        ).where(~DagModel.is_stale)
         dags_parsed = session.execute(query)
 
         for dag in dags_parsed:
+            # Dags whose bundle has been removed from config (bundle no longer 
active) are stale —
+            # the processor has stopped parsing their files, so the time-based 
check below would never fire.
+            if dag.bundle_name in inactive_bundles:
+                self.log.info(
+                    "Deactivating Dag %s. Its bundle %s is no longer active.",
+                    dag.dag_id,
+                    dag.bundle_name,
+                )
+                to_deactivate.add(dag.dag_id)
+                continue
             # When the Dag's last_parsed_time is more than the 
stale_dag_threshold older than the
             # Dag file's last_finish_time, the Dag is considered stale as has 
apparently been removed from the file,
             # This is especially relevant for Dag files that generate Dags in 
a dynamic manner.
diff --git a/airflow-core/tests/unit/dag_processing/test_manager.py 
b/airflow-core/tests/unit/dag_processing/test_manager.py
index 3c36ee066da..40b819f146e 100644
--- a/airflow-core/tests/unit/dag_processing/test_manager.py
+++ b/airflow-core/tests/unit/dag_processing/test_manager.py
@@ -1000,6 +1000,46 @@ class TestDagFileProcessorManager:
         # SerializedDagModel gives history about Dags
         assert serialized_dag_count == 1
 
+    @pytest.mark.usefixtures("testing_dag_bundle")
+    def test_deactivate_stale_dags_marks_dags_in_inactive_bundles(self, 
session):
+        """Dags whose bundle is no longer active should be marked stale even 
without a parse signal."""
+        session.add(DagBundleModel(name="gone-bundle"))
+        session.flush()
+        session.execute(
+            DagBundleModel.__table__.update().where(DagBundleModel.name == 
"gone-bundle").values(active=False)
+        )
+        session.add(
+            DagModel(
+                dag_id="dag_in_inactive_bundle",
+                bundle_name="gone-bundle",
+                relative_fileloc="some_file.py",
+                last_parsed_time=timezone.utcnow(),
+                is_stale=False,
+            )
+        )
+        session.add(
+            DagModel(
+                dag_id="dag_in_active_bundle",
+                bundle_name="testing",
+                relative_fileloc="other_file.py",
+                last_parsed_time=timezone.utcnow(),
+                is_stale=False,
+            )
+        )
+        session.flush()
+
+        manager = DagFileProcessorManager(max_runs=1, processor_timeout=10 * 
60)
+        manager.deactivate_stale_dags(last_parsed={})
+
+        is_stale_by_dag = dict(
+            session.execute(
+                select(DagModel.dag_id, DagModel.is_stale).where(
+                    DagModel.dag_id.in_(["dag_in_inactive_bundle", 
"dag_in_active_bundle"])
+                )
+            ).all()
+        )
+        assert is_stale_by_dag == {"dag_in_inactive_bundle": True, 
"dag_in_active_bundle": False}
+
     @mock.patch("airflow.dag_processing.manager.BundleUsageTrackingManager")
     def test_cleanup_stale_bundle_versions_interval(self, mock_bundle_manager):
         manager = DagFileProcessorManager(max_runs=1)

Reply via email to