This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new f4f8853888f Fix `DepContext` mutation leak and restore 
`reschedule-mode` guard (#62089)
f4f8853888f is described below

commit f4f8853888f46469950df3b19db0a96a301d7697
Author: Kaxil Naik <[email protected]>
AuthorDate: Tue Feb 17 19:30:29 2026 +0000

    Fix `DepContext` mutation leak and restore `reschedule-mode` guard (#62089)
    
    Two related fixes for issues introduced in #59604:
    
    1. `are_dependencies_met()` mutated the caller's dep_context.deps set
       in-place when adding ReadyToRescheduleDep for UP_FOR_RESCHEDULE TIs.
       Since the scheduler shares one DepContext across all TIs in a loop,
       this permanently leaked the dep into unrelated TIs.
       Fix: use `attrs.evolve` to create a new DepContext instead.
    
    2. `ReadyToRescheduleDep` lost its fast-exit guard for non-reschedule
       tasks in NONE state. Without it, every task hit the `task_reschedule`
       table on each scheduling loop.
       Fix: restore the guard that short-circuits for non-reschedule,
       non-mapped tasks in NONE state while still honoring reschedule_date
       for tasks explicitly in UP_FOR_RESCHEDULE state.
---
 airflow-core/src/airflow/models/taskinstance.py    | 15 +++++++-------
 .../airflow/ti_deps/deps/ready_to_reschedule.py    |  8 ++++++++
 .../tests/unit/models/test_taskinstance.py         | 23 ++++++++++++++++++++++
 .../ti_deps/deps/test_ready_to_reschedule_dep.py   |  8 ++++++++
 4 files changed, 46 insertions(+), 8 deletions(-)

diff --git a/airflow-core/src/airflow/models/taskinstance.py 
b/airflow-core/src/airflow/models/taskinstance.py
index 25168a41c01..475cbd7ae68 100644
--- a/airflow-core/src/airflow/models/taskinstance.py
+++ b/airflow-core/src/airflow/models/taskinstance.py
@@ -942,16 +942,15 @@ class TaskInstance(Base, LoggingMixin):
         """
         dep_context = dep_context or DepContext()
         if self.state == TaskInstanceState.UP_FOR_RESCHEDULE:
-            # This DepContext is used when a task instance is in 
UP_FOR_RESCHEDULE state.
-            #
             # Tasks can be put into UP_FOR_RESCHEDULE by the task runner 
itself (e.g. when
-            # the worker cannot load the Dag or task). In this case, the 
scheduler must respect
-            # the task instance's reschedule_date before scheduling it again.
+            # the worker cannot load the DAG or task). The scheduler must 
respect the
+            # reschedule_date before scheduling it again.
             #
-            # ReadyToRescheduleDep is the only dependency that enforces this 
time-based gating.
-            # We therefore extend the normal scheduling dependency set with 
it, instead of
-            # modifying the global scheduler dependencies.
-            dep_context.deps.add(ReadyToRescheduleDep())
+            # We use attrs.evolve to create a *new* DepContext with 
ReadyToRescheduleDep added,
+            # instead of mutating the caller's dep_context.deps set in-place.  
The same
+            # dep_context is shared across all TIs in a scheduler loop, so 
mutating it would
+            # permanently leak the dep into subsequent, unrelated TIs.
+            dep_context = attrs.evolve(dep_context, deps=dep_context.deps | 
{ReadyToRescheduleDep()})
         failed = False
         verbose_aware_logger = self.log.info if verbose else self.log.debug
         for dep_status in 
self.get_failed_dep_statuses(dep_context=dep_context, session=session):
diff --git a/airflow-core/src/airflow/ti_deps/deps/ready_to_reschedule.py 
b/airflow-core/src/airflow/ti_deps/deps/ready_to_reschedule.py
index feadd5587c3..6f611c43d61 100644
--- a/airflow-core/src/airflow/ti_deps/deps/ready_to_reschedule.py
+++ b/airflow-core/src/airflow/ti_deps/deps/ready_to_reschedule.py
@@ -58,6 +58,14 @@ class ReadyToRescheduleDep(BaseTIDep):
         This dependency fails if the latest reschedule request's reschedule 
date is still
         in the future.
         """
+        # Fast-exit for non-reschedule tasks in NONE state.  When the task is 
explicitly
+        # UP_FOR_RESCHEDULE we *always* check TaskReschedule regardless of 
operator type
+        # (e.g. startup/DAG-load rescheduling).  For NONE-state tasks, only 
reschedule-mode
+        # sensors (and mapped tasks whose reschedule attr is unknown) need the 
DB query.
+        if ti.state is None and ti.map_index < 0 and not getattr(ti.task, 
"reschedule", False):
+            yield self._passing_status(reason="Task is not in reschedule 
mode.")
+            return
+
         if dep_context.ignore_in_reschedule_period:
             yield self._passing_status(
                 reason="The context specified that being in a reschedule 
period was permitted."
diff --git a/airflow-core/tests/unit/models/test_taskinstance.py 
b/airflow-core/tests/unit/models/test_taskinstance.py
index 5ea9184a8b3..a013b09cdb0 100644
--- a/airflow-core/tests/unit/models/test_taskinstance.py
+++ b/airflow-core/tests/unit/models/test_taskinstance.py
@@ -1337,6 +1337,29 @@ class TestTaskInstance:
         ):
             assert ti.are_dependencies_met()
 
+    def test_are_dependencies_met_does_not_mutate_shared_dep_context(self, 
dag_maker, session):
+        """Verify that calling are_dependencies_met on an UP_FOR_RESCHEDULE TI 
does not
+        mutate the caller's DepContext.deps set.  The scheduler shares one 
DepContext across
+        all TIs in a loop, so mutation would leak ReadyToRescheduleDep into 
unrelated TIs."""
+        with dag_maker("test_depctx_no_mutation", serialized=True):
+            EmptyOperator(task_id="t")
+
+        dr = dag_maker.create_dagrun(session=session)
+        ti = dr.get_task_instance(task_id="t", session=session)
+        ti.state = TaskInstanceState.UP_FOR_RESCHEDULE
+        session.merge(ti)
+        session.flush()
+
+        dep_context = DepContext(deps=RUNNING_DEPS)
+        original_deps = dep_context.deps.copy()
+
+        ti.task = dr.dag.task_dict[ti.task_id]
+        ti.are_dependencies_met(dep_context=dep_context, session=session)
+
+        assert dep_context.deps == original_deps, (
+            "DepContext.deps was mutated — ReadyToRescheduleDep leaked into 
the shared set"
+        )
+
     @pytest.mark.parametrize(
         ("downstream_ti_state", "expected_are_dependents_done"),
         [
diff --git 
a/airflow-core/tests/unit/ti_deps/deps/test_ready_to_reschedule_dep.py 
b/airflow-core/tests/unit/ti_deps/deps/test_ready_to_reschedule_dep.py
index 27d8f4674d5..04f13388a5f 100644
--- a/airflow-core/tests/unit/ti_deps/deps/test_ready_to_reschedule_dep.py
+++ b/airflow-core/tests/unit/ti_deps/deps/test_ready_to_reschedule_dep.py
@@ -107,6 +107,14 @@ class TestNotInReschedulePeriodDep:
         ti = self._get_task_instance(State.UP_FOR_RETRY)
         assert ReadyToRescheduleDep().is_met(ti=ti)
 
+    def 
test_should_pass_without_db_query_for_non_reschedule_task_in_none_state(
+        self, not_expected_tr_db_call
+    ):
+        """Non-reschedule, non-mapped tasks in NONE state should short-circuit 
without a DB query."""
+        ti = self._get_task_instance(State.NONE)
+        ti.task.reschedule = False
+        assert ReadyToRescheduleDep().is_met(ti=ti)
+
     def test_should_pass_if_no_reschedule_record_exists(self):
         ti = self._get_task_instance(State.NONE)
         assert ReadyToRescheduleDep().is_met(ti=ti)

Reply via email to