This is an automated email from the ASF dual-hosted git repository.
vatsrahul1001 pushed a commit to branch v3-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-2-test by this push:
new 06cdb3782c7 fix(scheduler): ignore stale executor success after defer
reschedule (#66431) (#67089)
06cdb3782c7 is described below
commit 06cdb3782c73e5074e6dbeb889b50cb8ff34093b
Author: Rahul Vats <[email protected]>
AuthorDate: Mon May 18 15:04:10 2026 +0530
fix(scheduler): ignore stale executor success after defer reschedule
(#66431) (#67089)
* fix(scheduler): ignore stale executor success after defer reschedule
When a trigger moves a deferred task back to scheduled before the
scheduler processes the executor success from the worker defer exit,
treat it as benign (same try_number, next_method set) instead of
state mismatch failure.
Closes #66374
* Remove newsfragment for bugfix (per review)
---------
(cherry picked from commit ac39596bd531f8df6092531b3bde7acb54fff16f)
Co-authored-by: /-\ - Pedro Henrique Klein <[email protected]>
Co-authored-by: Cursor <[email protected]>
---
.../src/airflow/jobs/scheduler_job_runner.py | 17 +++++--
airflow-core/tests/unit/jobs/test_scheduler_job.py | 57 ++++++++++++++++++++++
2 files changed, 71 insertions(+), 3 deletions(-)
diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
index e70f346fb15..7b6a781ce89 100644
--- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
+++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
@@ -1199,7 +1199,9 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
The method handles several key scenarios:
1. **Normal task completion**: Updates task states for
successful/failed tasks
2. **External termination**: Detects tasks killed outside Airflow and
marks them as failed
- 3. **Task requeuing**: Handles tasks that were requeued by other
schedulers or executors
+ 3. **Task requeuing**: Handles tasks that were requeued by other
schedulers or executors,
+ and tasks moved to ``scheduled`` after a trigger fired so a stale
executor success from the
+ pre-deferral worker exit does not fail the task instance
4. **Callback processing**: Sends task callback requests to DAG
Processor for execution
5. **Email notifications**: Sends email notification requests to DAG
Processor
@@ -1349,12 +1351,14 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
ti.pid,
)
- # There are two scenarios why the same TI with the same try_number
is queued
- # after executor is finished with it:
+ # There are multiple scenarios why the same TI with the same
try_number looks queued or
+ # waiting after the executor is finished with it:
# 1) the TI was killed externally and it had no time to mark
itself failed
# - in this case we should mark it as failed here.
# 2) the TI has been requeued after getting deferred - in this
case either our executor has it
# or the TI is queued by another job. Either ways we should not
fail it.
+ # 3) the trigger already put the TI back to scheduled (resume
after defer) but the executor success
+ # from the worker exit after defer() has not been processed yet -
should not fail it.
# All of this could also happen if the state is "running",
# but that is handled by the scheduler detecting task instances
without heartbeats.
@@ -1368,6 +1372,13 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
ti_requeued = (
ti.queued_by_job_id != job_id # Another scheduler has queued
this task again
or executor.has_task(ti) # This scheduler has this task
already
+ or (
+ # Resume-after-defer: trigger moved TI to scheduled
(next_method set) before we saw the
+ # executor success from the defer exit for the same
try_number.
+ ti.state == TaskInstanceState.SCHEDULED
+ and state == TaskInstanceState.SUCCESS
+ and ti.next_method is not None
+ )
)
if ti_queued and not ti_requeued:
diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py
b/airflow-core/tests/unit/jobs/test_scheduler_job.py
index ebd3bd737f6..e19bed5f52d 100644
--- a/airflow-core/tests/unit/jobs/test_scheduler_job.py
+++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py
@@ -806,6 +806,63 @@ class TestSchedulerJob:
self.job_runner.executor.callback_sink.send.assert_not_called()
mock_stats_incr.assert_not_called()
+ @mock.patch("airflow.jobs.scheduler_job_runner.TaskCallbackRequest")
+ @mock.patch("airflow.jobs.scheduler_job_runner.Stats.incr")
+ def test_process_executor_events_stale_success_when_scheduled_after_defer(
+ self, mock_stats_incr, mock_task_callback, dag_maker
+ ):
+ """
+ Trigger moved TI to scheduled (resume after defer) before executor
success from defer exit arrived.
+
+ Regression for https://github.com/apache/airflow/issues/66374 — must
not treat as state mismatch.
+ """
+ dag_id =
"test_process_executor_events_stale_success_scheduled_after_defer"
+ task_id_1 = "dummy_task"
+
+ session = settings.Session()
+ with dag_maker(dag_id=dag_id, fileloc="/test_path1/"):
+ task1 = EmptyOperator(task_id=task_id_1)
+ ti1 = dag_maker.create_dagrun().get_task_instance(task1.task_id)
+
+ executor = MockExecutor(do_update=False)
+ task_callback = mock.MagicMock()
+ mock_task_callback.return_value = task_callback
+ scheduler_job = Job()
+ session.add(scheduler_job)
+ session.flush()
+ self.job_runner = SchedulerJobRunner(scheduler_job,
executors=[executor])
+
+ ti1.state = State.SCHEDULED
+ ti1.next_method = "execute_callback"
+ ti1.queued_by_job_id = scheduler_job.id
+ ti1.try_number = 1
+ session.merge(ti1)
+ session.commit()
+
+ executor.event_buffer[ti1.key] = State.SUCCESS, None
+ executor.has_task = mock.MagicMock(return_value=False)
+ mock_stats_incr.reset_mock()
+
+ self.job_runner._process_executor_events(executor=executor,
session=session)
+ ti1.refresh_from_db(session=session)
+ assert ti1.state == State.SCHEDULED
+ self.job_runner.executor.callback_sink.send.assert_not_called()
+ mock_stats_incr.assert_not_called()
+
+ # Without next_method, scheduled + stale success is still a mismatch
(e.g. external kill).
+ ti1.next_method = None
+ session.merge(ti1)
+ session.commit()
+
+ executor.event_buffer[ti1.key] = State.SUCCESS, None
+ mock_stats_incr.reset_mock()
+
+ self.job_runner._process_executor_events(executor=executor,
session=session)
+ mock_stats_incr.assert_any_call(
+ "scheduler.tasks.killed_externally",
+ tags={"dag_id": dag_id, "task_id": ti1.task_id},
+ )
+
@mock.patch("airflow.jobs.scheduler_job_runner.TaskCallbackRequest")
@mock.patch("airflow.jobs.scheduler_job_runner.Stats.incr")
def test_process_executor_events_multiple_try_numbers_warns(