This is an automated email from the ASF dual-hosted git repository.
ephraimanierobi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 3b97c94376a Fix dag processor crash by ignoring callbacks from other
bundles (#57192)
3b97c94376a is described below
commit 3b97c94376af0e793451d03b7e775ea47bd324a8
Author: Ephraim Anierobi <[email protected]>
AuthorDate: Fri Oct 24 17:17:41 2025 +0100
Fix dag processor crash by ignoring callbacks from other bundles (#57192)
When the dag Processor is made to run specific bundle, it previously
attempted to
process callback requests for bundles it does not own, leading to a
StopIteration in render_log_filename due to missing bundle references.
Now, fetch_callbacks only enqueues callbacks whose bundle_name matches
one of the manager’s active bundles, leaving others in the DB for the
correct processor to handle.
closes: #57081
---
airflow-core/src/airflow/dag_processing/manager.py | 6 ++-
.../tests/unit/dag_processing/test_manager.py | 46 ++++++++++++++++++++++
2 files changed, 51 insertions(+), 1 deletion(-)
diff --git a/airflow-core/src/airflow/dag_processing/manager.py
b/airflow-core/src/airflow/dag_processing/manager.py
index 93046d9a78f..84a99a480a9 100644
--- a/airflow-core/src/airflow/dag_processing/manager.py
+++ b/airflow-core/src/airflow/dag_processing/manager.py
@@ -451,6 +451,7 @@ class DagFileProcessorManager(LoggingMixin):
callback_queue: list[CallbackRequest] = []
with prohibit_commit(session) as guard:
+ bundle_names = [bundle.name for bundle in self._dag_bundles]
query: Select[tuple[DbCallbackRequest]] = select(DbCallbackRequest)
query =
query.order_by(DbCallbackRequest.priority_weight.desc()).limit(
self.max_callbacks_per_loop
@@ -461,8 +462,11 @@ class DagFileProcessorManager(LoggingMixin):
)
callbacks = session.scalars(query)
for callback in callbacks:
+ req = callback.get_callback_request()
+ if req.bundle_name not in bundle_names:
+ continue
try:
- callback_queue.append(callback.get_callback_request())
+ callback_queue.append(req)
session.delete(callback)
except Exception as e:
self.log.warning("Error adding callback for execution: %s,
%s", callback, e)
diff --git a/airflow-core/tests/unit/dag_processing/test_manager.py
b/airflow-core/tests/unit/dag_processing/test_manager.py
index e3ff30ae070..205eaa4aaed 100644
--- a/airflow-core/tests/unit/dag_processing/test_manager.py
+++ b/airflow-core/tests/unit/dag_processing/test_manager.py
@@ -772,6 +772,7 @@ class TestDagFileProcessorManager:
with configure_testing_dag_bundle(dag_filepath):
manager = DagFileProcessorManager(max_runs=1)
+ manager._dag_bundles =
list(DagBundlesManager().get_all_dag_bundles())
with create_session() as session:
callbacks = manager._fetch_callbacks(session=session)
@@ -815,6 +816,51 @@ class TestDagFileProcessorManager:
manager.run()
assert session.query(DbCallbackRequest).count() == 1
+ @conf_vars({("core", "load_examples"): "False"})
+ def test_fetch_callbacks_ignores_other_bundles(self,
configure_testing_dag_bundle):
+ """Ensure callbacks for bundles not owned by current dag processor
manager are ignored and not deleted."""
+
+ dag_filepath = TEST_DAG_FOLDER / "test_on_failure_callback_dag.py"
+
+ # Create two callbacks: one for the active 'testing' bundle and one
for a different bundle
+ matching = DagCallbackRequest(
+ dag_id="test_start_date_scheduling",
+ bundle_name="testing",
+ bundle_version=None,
+ filepath="test_on_failure_callback_dag.py",
+ is_failure_callback=True,
+ run_id="match",
+ )
+ non_matching = DagCallbackRequest(
+ dag_id="test_start_date_scheduling",
+ bundle_name="other-bundle",
+ bundle_version=None,
+ filepath="test_on_failure_callback_dag.py",
+ is_failure_callback=True,
+ run_id="no-match",
+ )
+
+ with create_session() as session:
+ session.add(DbCallbackRequest(callback=matching,
priority_weight=100))
+ session.add(DbCallbackRequest(callback=non_matching,
priority_weight=200))
+
+ with configure_testing_dag_bundle(dag_filepath):
+ manager = DagFileProcessorManager(max_runs=1)
+ manager._dag_bundles =
list(DagBundlesManager().get_all_dag_bundles())
+
+ with create_session() as session:
+ callbacks = manager._fetch_callbacks(session=session)
+
+ # Only the matching callback should be returned
+ assert [c.run_id for c in callbacks] == ["match"]
+
+ # The non-matching callback should remain in the DB
+ remaining = session.query(DbCallbackRequest).all()
+ assert len(remaining) == 1
+ # Decode remaining request and verify it's for the other bundle
+ remaining_req = remaining[0].get_callback_request()
+ assert remaining_req.bundle_name == "other-bundle"
+
@mock.patch.object(DagFileProcessorManager, "_get_logger_for_dag_file")
def test_callback_queue(self, mock_get_logger,
configure_testing_dag_bundle):
mock_logger = MagicMock()