This is an automated email from the ASF dual-hosted git repository.

vatsrahul1001 pushed a commit to branch v3-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v3-2-test by this push:
     new 06cdb3782c7 fix(scheduler): ignore stale executor success after defer 
reschedule (#66431) (#67089)
06cdb3782c7 is described below

commit 06cdb3782c73e5074e6dbeb889b50cb8ff34093b
Author: Rahul Vats <[email protected]>
AuthorDate: Mon May 18 15:04:10 2026 +0530

    fix(scheduler): ignore stale executor success after defer reschedule 
(#66431) (#67089)
    
    * fix(scheduler): ignore stale executor success after defer reschedule
    
    When a trigger moves a deferred task back to scheduled before the
    scheduler processes the executor success from the worker defer exit,
    treat it as benign (same try_number, next_method set) instead of
    state mismatch failure.
    
    Closes #66374
    
    
    
    * Remove newsfragment for bugfix (per review)
    
    ---------
    
    
    (cherry picked from commit ac39596bd531f8df6092531b3bde7acb54fff16f)
    
    Co-authored-by: /-\ - Pedro Henrique Klein <[email protected]>
    Co-authored-by: Cursor <[email protected]>
---
 .../src/airflow/jobs/scheduler_job_runner.py       | 17 +++++--
 airflow-core/tests/unit/jobs/test_scheduler_job.py | 57 ++++++++++++++++++++++
 2 files changed, 71 insertions(+), 3 deletions(-)

diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py 
b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
index e70f346fb15..7b6a781ce89 100644
--- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
+++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
@@ -1199,7 +1199,9 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
         The method handles several key scenarios:
         1. **Normal task completion**: Updates task states for 
successful/failed tasks
         2. **External termination**: Detects tasks killed outside Airflow and 
marks them as failed
-        3. **Task requeuing**: Handles tasks that were requeued by other 
schedulers or executors
+        3. **Task requeuing**: Handles tasks that were requeued by other 
schedulers or executors,
+           and tasks moved to ``scheduled`` after a trigger fired so a stale 
executor success from the
+           pre-deferral worker exit does not fail the task instance
         4. **Callback processing**: Sends task callback requests to DAG 
Processor for execution
         5. **Email notifications**: Sends email notification requests to DAG 
Processor
 
@@ -1349,12 +1351,14 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
                 ti.pid,
             )
 
-            # There are two scenarios why the same TI with the same try_number 
is queued
-            # after executor is finished with it:
+            # There are multiple scenarios why the same TI with the same 
try_number looks queued or
+            # waiting after the executor is finished with it:
             # 1) the TI was killed externally and it had no time to mark 
itself failed
             # - in this case we should mark it as failed here.
             # 2) the TI has been requeued after getting deferred - in this 
case either our executor has it
             # or the TI is queued by another job. Either ways we should not 
fail it.
+            # 3) the trigger already put the TI back to scheduled (resume 
after defer) but the executor success
+            # from the worker exit after defer() has not been processed yet - 
should not fail it.
 
             # All of this could also happen if the state is "running",
             # but that is handled by the scheduler detecting task instances 
without heartbeats.
@@ -1368,6 +1372,13 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
             ti_requeued = (
                 ti.queued_by_job_id != job_id  # Another scheduler has queued 
this task again
                 or executor.has_task(ti)  # This scheduler has this task 
already
+                or (
+                    # Resume-after-defer: trigger moved TI to scheduled 
(next_method set) before we saw the
+                    # executor success from the defer exit for the same 
try_number.
+                    ti.state == TaskInstanceState.SCHEDULED
+                    and state == TaskInstanceState.SUCCESS
+                    and ti.next_method is not None
+                )
             )
 
             if ti_queued and not ti_requeued:
diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py 
b/airflow-core/tests/unit/jobs/test_scheduler_job.py
index ebd3bd737f6..e19bed5f52d 100644
--- a/airflow-core/tests/unit/jobs/test_scheduler_job.py
+++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py
@@ -806,6 +806,63 @@ class TestSchedulerJob:
         self.job_runner.executor.callback_sink.send.assert_not_called()
         mock_stats_incr.assert_not_called()
 
+    @mock.patch("airflow.jobs.scheduler_job_runner.TaskCallbackRequest")
+    @mock.patch("airflow.jobs.scheduler_job_runner.Stats.incr")
+    def test_process_executor_events_stale_success_when_scheduled_after_defer(
+        self, mock_stats_incr, mock_task_callback, dag_maker
+    ):
+        """
+        Trigger moved TI to scheduled (resume after defer) before executor 
success from defer exit arrived.
+
+        Regression for https://github.com/apache/airflow/issues/66374 — must 
not treat as state mismatch.
+        """
+        dag_id = 
"test_process_executor_events_stale_success_scheduled_after_defer"
+        task_id_1 = "dummy_task"
+
+        session = settings.Session()
+        with dag_maker(dag_id=dag_id, fileloc="/test_path1/"):
+            task1 = EmptyOperator(task_id=task_id_1)
+        ti1 = dag_maker.create_dagrun().get_task_instance(task1.task_id)
+
+        executor = MockExecutor(do_update=False)
+        task_callback = mock.MagicMock()
+        mock_task_callback.return_value = task_callback
+        scheduler_job = Job()
+        session.add(scheduler_job)
+        session.flush()
+        self.job_runner = SchedulerJobRunner(scheduler_job, 
executors=[executor])
+
+        ti1.state = State.SCHEDULED
+        ti1.next_method = "execute_callback"
+        ti1.queued_by_job_id = scheduler_job.id
+        ti1.try_number = 1
+        session.merge(ti1)
+        session.commit()
+
+        executor.event_buffer[ti1.key] = State.SUCCESS, None
+        executor.has_task = mock.MagicMock(return_value=False)
+        mock_stats_incr.reset_mock()
+
+        self.job_runner._process_executor_events(executor=executor, 
session=session)
+        ti1.refresh_from_db(session=session)
+        assert ti1.state == State.SCHEDULED
+        self.job_runner.executor.callback_sink.send.assert_not_called()
+        mock_stats_incr.assert_not_called()
+
+        # Without next_method, scheduled + stale success is still a mismatch 
(e.g. external kill).
+        ti1.next_method = None
+        session.merge(ti1)
+        session.commit()
+
+        executor.event_buffer[ti1.key] = State.SUCCESS, None
+        mock_stats_incr.reset_mock()
+
+        self.job_runner._process_executor_events(executor=executor, 
session=session)
+        mock_stats_incr.assert_any_call(
+            "scheduler.tasks.killed_externally",
+            tags={"dag_id": dag_id, "task_id": ti1.task_id},
+        )
+
     @mock.patch("airflow.jobs.scheduler_job_runner.TaskCallbackRequest")
     @mock.patch("airflow.jobs.scheduler_job_runner.Stats.incr")
     def test_process_executor_events_multiple_try_numbers_warns(

Reply via email to