This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 3b97c94376a Fix dag processor crash by ignoring callbacks from other 
bundles (#57192)
3b97c94376a is described below

commit 3b97c94376af0e793451d03b7e775ea47bd324a8
Author: Ephraim Anierobi <[email protected]>
AuthorDate: Fri Oct 24 17:17:41 2025 +0100

    Fix dag processor crash by ignoring callbacks from other bundles (#57192)
    
    When the dag Processor is made to run specific bundle, it previously 
attempted to
    process callback requests for bundles it does not own, leading to a
    StopIteration in render_log_filename due to missing bundle references.
    
    Now, fetch_callbacks only enqueues callbacks whose bundle_name matches
    one of the manager’s active bundles, leaving others in the DB for the
    correct processor to handle.
    
    closes: #57081
---
 airflow-core/src/airflow/dag_processing/manager.py |  6 ++-
 .../tests/unit/dag_processing/test_manager.py      | 46 ++++++++++++++++++++++
 2 files changed, 51 insertions(+), 1 deletion(-)

diff --git a/airflow-core/src/airflow/dag_processing/manager.py 
b/airflow-core/src/airflow/dag_processing/manager.py
index 93046d9a78f..84a99a480a9 100644
--- a/airflow-core/src/airflow/dag_processing/manager.py
+++ b/airflow-core/src/airflow/dag_processing/manager.py
@@ -451,6 +451,7 @@ class DagFileProcessorManager(LoggingMixin):
 
         callback_queue: list[CallbackRequest] = []
         with prohibit_commit(session) as guard:
+            bundle_names = [bundle.name for bundle in self._dag_bundles]
             query: Select[tuple[DbCallbackRequest]] = select(DbCallbackRequest)
             query = 
query.order_by(DbCallbackRequest.priority_weight.desc()).limit(
                 self.max_callbacks_per_loop
@@ -461,8 +462,11 @@ class DagFileProcessorManager(LoggingMixin):
             )
             callbacks = session.scalars(query)
             for callback in callbacks:
+                req = callback.get_callback_request()
+                if req.bundle_name not in bundle_names:
+                    continue
                 try:
-                    callback_queue.append(callback.get_callback_request())
+                    callback_queue.append(req)
                     session.delete(callback)
                 except Exception as e:
                     self.log.warning("Error adding callback for execution: %s, 
%s", callback, e)
diff --git a/airflow-core/tests/unit/dag_processing/test_manager.py 
b/airflow-core/tests/unit/dag_processing/test_manager.py
index e3ff30ae070..205eaa4aaed 100644
--- a/airflow-core/tests/unit/dag_processing/test_manager.py
+++ b/airflow-core/tests/unit/dag_processing/test_manager.py
@@ -772,6 +772,7 @@ class TestDagFileProcessorManager:
 
         with configure_testing_dag_bundle(dag_filepath):
             manager = DagFileProcessorManager(max_runs=1)
+            manager._dag_bundles = 
list(DagBundlesManager().get_all_dag_bundles())
 
             with create_session() as session:
                 callbacks = manager._fetch_callbacks(session=session)
@@ -815,6 +816,51 @@ class TestDagFileProcessorManager:
                 manager.run()
                 assert session.query(DbCallbackRequest).count() == 1
 
+    @conf_vars({("core", "load_examples"): "False"})
+    def test_fetch_callbacks_ignores_other_bundles(self, 
configure_testing_dag_bundle):
+        """Ensure callbacks for bundles not owned by current dag processor 
manager are ignored and not deleted."""
+
+        dag_filepath = TEST_DAG_FOLDER / "test_on_failure_callback_dag.py"
+
+        # Create two callbacks: one for the active 'testing' bundle and one 
for a different bundle
+        matching = DagCallbackRequest(
+            dag_id="test_start_date_scheduling",
+            bundle_name="testing",
+            bundle_version=None,
+            filepath="test_on_failure_callback_dag.py",
+            is_failure_callback=True,
+            run_id="match",
+        )
+        non_matching = DagCallbackRequest(
+            dag_id="test_start_date_scheduling",
+            bundle_name="other-bundle",
+            bundle_version=None,
+            filepath="test_on_failure_callback_dag.py",
+            is_failure_callback=True,
+            run_id="no-match",
+        )
+
+        with create_session() as session:
+            session.add(DbCallbackRequest(callback=matching, 
priority_weight=100))
+            session.add(DbCallbackRequest(callback=non_matching, 
priority_weight=200))
+
+        with configure_testing_dag_bundle(dag_filepath):
+            manager = DagFileProcessorManager(max_runs=1)
+            manager._dag_bundles = 
list(DagBundlesManager().get_all_dag_bundles())
+
+            with create_session() as session:
+                callbacks = manager._fetch_callbacks(session=session)
+
+                # Only the matching callback should be returned
+                assert [c.run_id for c in callbacks] == ["match"]
+
+                # The non-matching callback should remain in the DB
+                remaining = session.query(DbCallbackRequest).all()
+                assert len(remaining) == 1
+                # Decode remaining request and verify it's for the other bundle
+                remaining_req = remaining[0].get_callback_request()
+                assert remaining_req.bundle_name == "other-bundle"
+
     @mock.patch.object(DagFileProcessorManager, "_get_logger_for_dag_file")
     def test_callback_queue(self, mock_get_logger, 
configure_testing_dag_bundle):
         mock_logger = MagicMock()

Reply via email to