This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new f4f8853888f Fix `DepContext` mutation leak and restore
`reschedule-mode` guard (#62089)
f4f8853888f is described below
commit f4f8853888f46469950df3b19db0a96a301d7697
Author: Kaxil Naik <[email protected]>
AuthorDate: Tue Feb 17 19:30:29 2026 +0000
Fix `DepContext` mutation leak and restore `reschedule-mode` guard (#62089)
Two related fixes for issues introduced in #59604:
1. `are_dependencies_met()` mutated the caller's dep_context.deps set
in-place when adding ReadyToRescheduleDep for UP_FOR_RESCHEDULE TIs.
Since the scheduler shares one DepContext across all TIs in a loop,
this permanently leaked the dep into unrelated TIs.
Fix: use `attrs.evolve` to create a new DepContext instead.
2. `ReadyToRescheduleDep` lost its fast-exit guard for non-reschedule
tasks in NONE state. Without it, every task hit the `task_reschedule`
table on each scheduling loop.
Fix: restore the guard that short-circuits for non-reschedule,
non-mapped tasks in NONE state while still honoring reschedule_date
for tasks explicitly in UP_FOR_RESCHEDULE state.
---
airflow-core/src/airflow/models/taskinstance.py | 15 +++++++-------
.../airflow/ti_deps/deps/ready_to_reschedule.py | 8 ++++++++
.../tests/unit/models/test_taskinstance.py | 23 ++++++++++++++++++++++
.../ti_deps/deps/test_ready_to_reschedule_dep.py | 8 ++++++++
4 files changed, 46 insertions(+), 8 deletions(-)
diff --git a/airflow-core/src/airflow/models/taskinstance.py
b/airflow-core/src/airflow/models/taskinstance.py
index 25168a41c01..475cbd7ae68 100644
--- a/airflow-core/src/airflow/models/taskinstance.py
+++ b/airflow-core/src/airflow/models/taskinstance.py
@@ -942,16 +942,15 @@ class TaskInstance(Base, LoggingMixin):
"""
dep_context = dep_context or DepContext()
if self.state == TaskInstanceState.UP_FOR_RESCHEDULE:
- # This DepContext is used when a task instance is in
UP_FOR_RESCHEDULE state.
- #
# Tasks can be put into UP_FOR_RESCHEDULE by the task runner
itself (e.g. when
- # the worker cannot load the Dag or task). In this case, the
scheduler must respect
- # the task instance's reschedule_date before scheduling it again.
+ # the worker cannot load the DAG or task). The scheduler must
respect the
+ # reschedule_date before scheduling it again.
#
- # ReadyToRescheduleDep is the only dependency that enforces this
time-based gating.
- # We therefore extend the normal scheduling dependency set with
it, instead of
- # modifying the global scheduler dependencies.
- dep_context.deps.add(ReadyToRescheduleDep())
+ # We use attrs.evolve to create a *new* DepContext with
ReadyToRescheduleDep added,
+ # instead of mutating the caller's dep_context.deps set in-place.
The same
+ # dep_context is shared across all TIs in a scheduler loop, so
mutating it would
+ # permanently leak the dep into subsequent, unrelated TIs.
+ dep_context = attrs.evolve(dep_context, deps=dep_context.deps |
{ReadyToRescheduleDep()})
failed = False
verbose_aware_logger = self.log.info if verbose else self.log.debug
for dep_status in
self.get_failed_dep_statuses(dep_context=dep_context, session=session):
diff --git a/airflow-core/src/airflow/ti_deps/deps/ready_to_reschedule.py
b/airflow-core/src/airflow/ti_deps/deps/ready_to_reschedule.py
index feadd5587c3..6f611c43d61 100644
--- a/airflow-core/src/airflow/ti_deps/deps/ready_to_reschedule.py
+++ b/airflow-core/src/airflow/ti_deps/deps/ready_to_reschedule.py
@@ -58,6 +58,14 @@ class ReadyToRescheduleDep(BaseTIDep):
This dependency fails if the latest reschedule request's reschedule
date is still
in the future.
"""
+ # Fast-exit for non-reschedule tasks in NONE state. When the task is
explicitly
+ # UP_FOR_RESCHEDULE we *always* check TaskReschedule regardless of
operator type
+ # (e.g. startup/DAG-load rescheduling). For NONE-state tasks, only
reschedule-mode
+ # sensors (and mapped tasks whose reschedule attr is unknown) need the
DB query.
+ if ti.state is None and ti.map_index < 0 and not getattr(ti.task,
"reschedule", False):
+ yield self._passing_status(reason="Task is not in reschedule
mode.")
+ return
+
if dep_context.ignore_in_reschedule_period:
yield self._passing_status(
reason="The context specified that being in a reschedule
period was permitted."
diff --git a/airflow-core/tests/unit/models/test_taskinstance.py
b/airflow-core/tests/unit/models/test_taskinstance.py
index 5ea9184a8b3..a013b09cdb0 100644
--- a/airflow-core/tests/unit/models/test_taskinstance.py
+++ b/airflow-core/tests/unit/models/test_taskinstance.py
@@ -1337,6 +1337,29 @@ class TestTaskInstance:
):
assert ti.are_dependencies_met()
+ def test_are_dependencies_met_does_not_mutate_shared_dep_context(self,
dag_maker, session):
+ """Verify that calling are_dependencies_met on an UP_FOR_RESCHEDULE TI
does not
+ mutate the caller's DepContext.deps set. The scheduler shares one
DepContext across
+ all TIs in a loop, so mutation would leak ReadyToRescheduleDep into
unrelated TIs."""
+ with dag_maker("test_depctx_no_mutation", serialized=True):
+ EmptyOperator(task_id="t")
+
+ dr = dag_maker.create_dagrun(session=session)
+ ti = dr.get_task_instance(task_id="t", session=session)
+ ti.state = TaskInstanceState.UP_FOR_RESCHEDULE
+ session.merge(ti)
+ session.flush()
+
+ dep_context = DepContext(deps=RUNNING_DEPS)
+ original_deps = dep_context.deps.copy()
+
+ ti.task = dr.dag.task_dict[ti.task_id]
+ ti.are_dependencies_met(dep_context=dep_context, session=session)
+
+ assert dep_context.deps == original_deps, (
+ "DepContext.deps was mutated — ReadyToRescheduleDep leaked into
the shared set"
+ )
+
@pytest.mark.parametrize(
("downstream_ti_state", "expected_are_dependents_done"),
[
diff --git
a/airflow-core/tests/unit/ti_deps/deps/test_ready_to_reschedule_dep.py
b/airflow-core/tests/unit/ti_deps/deps/test_ready_to_reschedule_dep.py
index 27d8f4674d5..04f13388a5f 100644
--- a/airflow-core/tests/unit/ti_deps/deps/test_ready_to_reschedule_dep.py
+++ b/airflow-core/tests/unit/ti_deps/deps/test_ready_to_reschedule_dep.py
@@ -107,6 +107,14 @@ class TestNotInReschedulePeriodDep:
ti = self._get_task_instance(State.UP_FOR_RETRY)
assert ReadyToRescheduleDep().is_met(ti=ti)
+ def
test_should_pass_without_db_query_for_non_reschedule_task_in_none_state(
+ self, not_expected_tr_db_call
+ ):
+ """Non-reschedule, non-mapped tasks in NONE state should short-circuit
without a DB query."""
+ ti = self._get_task_instance(State.NONE)
+ ti.task.reschedule = False
+ assert ReadyToRescheduleDep().is_met(ti=ti)
+
def test_should_pass_if_no_reschedule_record_exists(self):
ti = self._get_task_instance(State.NONE)
assert ReadyToRescheduleDep().is_met(ti=ti)