This is an automated email from the ASF dual-hosted git repository.
gopidesu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 447771d5f5b Fix callback starvation across DAG bundles (#63795)
447771d5f5b is described below
commit 447771d5f5b8e1ad3aa4b6464c4ef03af4c93485
Author: GPK <[email protected]>
AuthorDate: Wed Mar 18 12:32:32 2026 +0000
Fix callback starvation across DAG bundles (#63795)
* Fix callback starvation across DAG bundles
* Fix script
* rebase changes
* Update airflow-core/src/airflow/dag_processing/manager.py
Co-authored-by: Ephraim Anierobi <[email protected]>
* Remove newsfragment
---------
Co-authored-by: Ephraim Anierobi <[email protected]>
---
airflow-core/docs/migrations-ref.rst | 4 +-
airflow-core/src/airflow/dag_processing/manager.py | 3 +-
...0109_3_2_0_add_bundle_name_to_callback_table.py | 94 ++++++++++++++++++++++
airflow-core/src/airflow/models/callback.py | 6 +-
airflow-core/src/airflow/utils/db.py | 2 +-
.../tests/unit/dag_processing/test_manager.py | 55 +++++++++++++
airflow-core/tests/unit/models/test_callback.py | 23 +++++-
7 files changed, 181 insertions(+), 6 deletions(-)
diff --git a/airflow-core/docs/migrations-ref.rst
b/airflow-core/docs/migrations-ref.rst
index cde5b6b32a5..07d6f5afc16 100644
--- a/airflow-core/docs/migrations-ref.rst
+++ b/airflow-core/docs/migrations-ref.rst
@@ -39,7 +39,9 @@ Here's the list of all the Database Migrations that are
executed via when you ru
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
| Revision ID | Revises ID | Airflow Version | Description
|
+=========================+==================+===================+==============================================================+
-| ``888b59e02a5b`` (head) | ``6222ce48e289`` | ``3.2.0`` | Fix
migration file ORM inconsistencies. |
+| ``1d6611b6ab7c`` (head) | ``888b59e02a5b`` | ``3.2.0`` | Add
bundle_name to callback table. |
++-------------------------+------------------+-------------------+--------------------------------------------------------------+
+| ``888b59e02a5b`` | ``6222ce48e289`` | ``3.2.0`` | Fix
migration file ORM inconsistencies. |
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
| ``6222ce48e289`` | ``134de42d3cb0`` | ``3.2.0`` | Add
partition fields to DagModel. |
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
diff --git a/airflow-core/src/airflow/dag_processing/manager.py
b/airflow-core/src/airflow/dag_processing/manager.py
index f8435659e1f..10b29c0560e 100644
--- a/airflow-core/src/airflow/dag_processing/manager.py
+++ b/airflow-core/src/airflow/dag_processing/manager.py
@@ -542,6 +542,7 @@ class DagFileProcessorManager(LoggingMixin):
bundle_names = [bundle.name for bundle in self._dag_bundles]
query: Select[tuple[DbCallbackRequest]] = with_row_locks(
select(DbCallbackRequest)
+ .where(DbCallbackRequest.bundle_name.in_(bundle_names))
.order_by(DbCallbackRequest.priority_weight.desc())
.limit(self.max_callbacks_per_loop),
of=DbCallbackRequest,
@@ -553,8 +554,6 @@ class DagFileProcessorManager(LoggingMixin):
]
for callback in callbacks:
req = callback.get_callback_request()
- if req.bundle_name not in bundle_names:
- continue
try:
callback_queue.append(req)
session.delete(callback)
diff --git
a/airflow-core/src/airflow/migrations/versions/0109_3_2_0_add_bundle_name_to_callback_table.py
b/airflow-core/src/airflow/migrations/versions/0109_3_2_0_add_bundle_name_to_callback_table.py
new file mode 100644
index 00000000000..c2fbc2641bd
--- /dev/null
+++
b/airflow-core/src/airflow/migrations/versions/0109_3_2_0_add_bundle_name_to_callback_table.py
@@ -0,0 +1,94 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Add bundle_name to callback table.
+
+Revision ID: 1d6611b6ab7c
+Revises: 888b59e02a5b
+Create Date: 2026-03-17 00:23:45.305588
+
+"""
+
+from __future__ import annotations
+
+import json
+
+import sqlalchemy as sa
+from alembic import context, op
+
+from airflow.migrations.db_types import StringID
+from airflow.utils.sqlalchemy import ExtendedJSON
+
+# revision identifiers, used by Alembic.
+revision = "1d6611b6ab7c"
+down_revision = "888b59e02a5b"
+branch_labels = None
+depends_on = None
+airflow_version = "3.2.0"
+
+
+def upgrade():
+ """Add bundle_name to callback rows and backfill dag-processor
callbacks."""
+ with op.batch_alter_table("callback", schema=None) as batch_op:
+ batch_op.add_column(sa.Column("bundle_name", StringID(),
nullable=True))
+
+ if context.is_offline_mode():
+ print(
+ """
+ -- WARNING: Unable to backfill callback.bundle_name values while
in offline mode!
+ -- The bundle_name column will be added without migrating
existing dag-processor callback rows.
+ -- Run this migration in online mode if you need existing pending
callbacks backfilled.
+ """
+ )
+ return
+
+ conn = op.get_bind()
+ callback = sa.table(
+ "callback",
+ sa.column("id", sa.Uuid()),
+ sa.column("type", sa.String(length=20)),
+ sa.column("data", ExtendedJSON()),
+ sa.column("bundle_name", StringID()),
+ )
+
+ rows = conn.execute(
+ sa.select(callback.c.id, callback.c.data).where(callback.c.type ==
"dag_processor")
+ ).mappings()
+ for row in rows:
+ data = row["data"] or {}
+ if isinstance(data, str):
+ data = json.loads(data)
+
+ req_data = data.get("req_data")
+ if isinstance(req_data, str):
+ req_data = json.loads(req_data)
+ elif not isinstance(req_data, dict):
+ continue
+
+ bundle_name = req_data.get("bundle_name")
+ if bundle_name is None:
+ continue
+
+ conn.execute(callback.update().where(callback.c.id ==
row["id"]).values(bundle_name=bundle_name))
+
+
+def downgrade():
+ """Remove bundle_name from callback rows."""
+ with op.batch_alter_table("callback", schema=None) as batch_op:
+ batch_op.drop_column("bundle_name")
diff --git a/airflow-core/src/airflow/models/callback.py
b/airflow-core/src/airflow/models/callback.py
index e2c46153a71..e08d58fa4da 100644
--- a/airflow-core/src/airflow/models/callback.py
+++ b/airflow-core/src/airflow/models/callback.py
@@ -34,6 +34,7 @@ from airflow._shared.timezones import timezone
from airflow.executors.workloads import BaseWorkload
from airflow.executors.workloads.callback import CallbackFetchMethod
from airflow.models import Base
+from airflow.models.base import StringID
from airflow.utils.sqlalchemy import ExtendedJSON, UtcDateTime
from airflow.utils.state import CallbackState
@@ -101,7 +102,6 @@ class Callback(Base, BaseWorkload):
"""Base class for callbacks."""
__tablename__ = "callback"
-
id: Mapped[UUID] = mapped_column(Uuid(), primary_key=True,
default=uuid6.uuid7)
# This is used by SQLAlchemy to be able to deserialize DB rows to
subclasses
@@ -117,6 +117,9 @@ class Callback(Base, BaseWorkload):
# Used by subclasses to store information about how to run the callback
data: Mapped[dict] = mapped_column(ExtendedJSON, nullable=False)
+ # Used to route dag-processor callbacks to filter by bundle name.
+ bundle_name: Mapped[str | None] = mapped_column(StringID(), nullable=True)
+
# State of the Callback of type: CallbackState. Can be null for instances
of DagProcessorCallback.
state: Mapped[str | None] = mapped_column(String(10))
@@ -269,6 +272,7 @@ class DagProcessorCallback(Callback):
self.fetch_method = CallbackFetchMethod.DAG_ATTRIBUTE
self.state = None
+ self.bundle_name = callback.bundle_name
self.data |= {"req_class": callback.__class__.__name__, "req_data":
callback.to_json()}
def get_callback_request(self) -> CallbackRequest:
diff --git a/airflow-core/src/airflow/utils/db.py
b/airflow-core/src/airflow/utils/db.py
index dae74d44cc7..9bc0608611b 100644
--- a/airflow-core/src/airflow/utils/db.py
+++ b/airflow-core/src/airflow/utils/db.py
@@ -115,7 +115,7 @@ _REVISION_HEADS_MAP: dict[str, str] = {
"3.0.3": "fe199e1abd77",
"3.1.0": "cc92b33c6709",
"3.1.8": "509b94a1042d",
- "3.2.0": "888b59e02a5b",
+ "3.2.0": "1d6611b6ab7c",
}
# Prefix used to identify tables holding data moved during migration.
diff --git a/airflow-core/tests/unit/dag_processing/test_manager.py
b/airflow-core/tests/unit/dag_processing/test_manager.py
index 00b0e50779c..40a8c4f0b66 100644
--- a/airflow-core/tests/unit/dag_processing/test_manager.py
+++ b/airflow-core/tests/unit/dag_processing/test_manager.py
@@ -1097,6 +1097,61 @@ class TestDagFileProcessorManager:
remaining_req = remaining[0].get_callback_request()
assert remaining_req.bundle_name == "other-bundle"
+ @conf_vars(
+ {
+ ("dag_processor", "max_callbacks_per_loop"): "2",
+ ("core", "load_examples"): "False",
+ }
+ )
+ def test_fetch_callbacks_filters_by_bundle_before_limit(self,
configure_testing_dag_bundle):
+ dag_filepath = TEST_DAG_FOLDER / "test_on_failure_callback_dag.py"
+
+ matching = DagCallbackRequest(
+ dag_id="test_start_date_scheduling",
+ bundle_name="testing",
+ bundle_version=None,
+ filepath="test_on_failure_callback_dag.py",
+ is_failure_callback=True,
+ run_id="match",
+ )
+ non_matching_1 = DagCallbackRequest(
+ dag_id="test_start_date_scheduling",
+ bundle_name="other-bundle-a",
+ bundle_version=None,
+ filepath="test_on_failure_callback_dag.py",
+ is_failure_callback=True,
+ run_id="no-match-1",
+ )
+ non_matching_2 = DagCallbackRequest(
+ dag_id="test_start_date_scheduling",
+ bundle_name="other-bundle-b",
+ bundle_version=None,
+ filepath="test_on_failure_callback_dag.py",
+ is_failure_callback=True,
+ run_id="no-match-2",
+ )
+
+ with create_session() as session:
+ session.add(DbCallbackRequest(callback=non_matching_1,
priority_weight=300))
+ session.add(DbCallbackRequest(callback=non_matching_2,
priority_weight=200))
+ session.add(DbCallbackRequest(callback=matching,
priority_weight=100))
+
+ with configure_testing_dag_bundle(dag_filepath):
+ manager = DagFileProcessorManager(max_runs=1)
+ manager._dag_bundles =
list(DagBundlesManager().get_all_dag_bundles())
+
+ with create_session() as session:
+ callbacks = manager._fetch_callbacks(session=session)
+
+ assert [c.run_id for c in callbacks] == ["match"]
+
+ remaining = session.scalars(select(DbCallbackRequest)).all()
+ assert len(remaining) == 2
+ assert {callback.bundle_name for callback in remaining} == {
+ "other-bundle-a",
+ "other-bundle-b",
+ }
+
@mock.patch.object(DagFileProcessorManager, "_get_logger_for_dag_file")
def test_callback_queue(self, mock_get_logger,
configure_testing_dag_bundle):
mock_logger = MagicMock()
diff --git a/airflow-core/tests/unit/models/test_callback.py
b/airflow-core/tests/unit/models/test_callback.py
index 20bbba29fc1..5d4940b77ad 100644
--- a/airflow-core/tests/unit/models/test_callback.py
+++ b/airflow-core/tests/unit/models/test_callback.py
@@ -21,11 +21,13 @@ from unittest.mock import patch
import pytest
from sqlalchemy import select
+from airflow.callbacks.callback_requests import DagCallbackRequest
from airflow.models import Trigger
from airflow.models.callback import (
Callback,
CallbackFetchMethod,
CallbackState,
+ DagProcessorCallback,
ExecutorCallback,
TriggererCallback,
_accepts_context,
@@ -210,7 +212,26 @@ class TestExecutorCallback:
assert callback.state == CallbackState.QUEUED
-# Note: class DagProcessorCallback is tested in
airflow-core/tests/unit/dag_processing/test_manager.py
+class TestDagProcessorCallback:
+ def test_polymorphic_serde(self, session):
+ callback_request = DagCallbackRequest(
+ dag_id="test_start_date_scheduling",
+ bundle_name="testing",
+ bundle_version=None,
+ filepath="test_on_failure_callback_dag.py",
+ is_failure_callback=True,
+ run_id="123",
+ )
+ callback = DagProcessorCallback(priority_weight=11,
callback=callback_request)
+ session.add(callback)
+ session.commit()
+
+ retrieved = session.scalar(select(Callback).where(Callback.id ==
callback.id))
+ assert isinstance(retrieved, DagProcessorCallback)
+ assert retrieved.fetch_method == CallbackFetchMethod.DAG_ATTRIBUTE
+ assert retrieved.bundle_name == "testing"
+ assert retrieved.priority_weight == 11
+ assert retrieved.state is None
class TestAcceptsContext: