1fanwang commented on code in PR #65970:
URL: https://github.com/apache/airflow/pull/65970#discussion_r3227334816


##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -2552,92 +2551,117 @@ def _maybe_requeue_stuck_ti(self, *, ti, session, 
executor):
 
         Otherwise, fail it.
         """
-        num_times_stuck = self._get_num_times_stuck_in_queued(ti, session)
+        current_ti = self._get_ti_still_stuck_in_queued(ti=ti, session=session)
+        if current_ti is None:
+            self.log.debug(
+                "Task changed state before queued-timeout recovery; skipping. 
task_instance=%s",
+                ti,
+            )
+            return
+
+        num_times_stuck = self._get_num_times_stuck_in_queued(current_ti, 
session)
         if num_times_stuck < self._num_stuck_queued_retries:
-            self.log.info("Task stuck in queued; will try to requeue. 
task_instance=%s", ti)
+            self.log.info("Task stuck in queued; will try to requeue. 
task_instance=%s", current_ti)
+            executor.revoke_task(ti=current_ti)
             session.add(
                 Log(
                     event=TASK_STUCK_IN_QUEUED_RESCHEDULE_EVENT,
-                    task_instance=ti.key,
+                    task_instance=current_ti.key,
                     extra=(
                         f"Task was in queued state for longer than 
{self._task_queued_timeout} "
                         "seconds; task state will be set back to scheduled."
                     ),
                 )
             )
-            self._reschedule_stuck_task(ti, session=session)
+            self._reschedule_stuck_task(current_ti, session=session)
         else:
             self.log.info(
                 "Task requeue attempts exceeded max; marking failed. 
task_instance=%s",
-                ti,
+                current_ti,
             )
             msg = f"Task was requeued more than 
{self._num_stuck_queued_retries} times and will be failed."
             session.add(
                 Log(
                     event="stuck in queued tries exceeded",
-                    task_instance=ti.key,
+                    task_instance=current_ti.key,
                     extra=msg,
                 )
             )
 
             try:
-                dag = 
self.scheduler_dag_bag.get_dag_for_run(dag_run=ti.dag_run, session=session)
-                task = dag.get_task(ti.task_id)
+                dag = 
self.scheduler_dag_bag.get_dag_for_run(dag_run=current_ti.dag_run, 
session=session)
+                task = dag.get_task(current_ti.task_id)
             except Exception:
                 self.log.warning(
                     "The DAG or task could not be found. If a failure callback 
exists, it will not be run.",
                     exc_info=True,
                 )
             else:
                 if task.has_on_failure_callback:
-                    if inspect(ti).detached:
-                        ti = session.merge(ti)
+                    if inspect(current_ti).detached:
+                        current_ti = session.merge(current_ti)
                     # Safely extract bundle info with fallback for legacy tasks
                     # (dag_version may be None after Airflow 2 → 3 migration).
                     _stuck_bundle_name = (
-                        ti.dag_version.bundle_name if ti.dag_version else 
ti.dag_model.bundle_name
+                        current_ti.dag_version.bundle_name
+                        if current_ti.dag_version
+                        else current_ti.dag_model.bundle_name
                     )
                     _stuck_bundle_version = (
-                        ti.dag_version.bundle_version if ti.dag_version else 
ti.dag_run.bundle_version
+                        current_ti.dag_version.bundle_version
+                        if current_ti.dag_version
+                        else current_ti.dag_run.bundle_version
                     )
                     # Backfill dag_version_id for legacy tasks (Pydantic 
requires uuid.UUID).
                     # Note: we cannot use `continue` here because this method 
is not
                     # inside a loop.  If backfilling fails we simply skip the 
callback.
-                    if _ensure_ti_has_dag_version_id(ti, session, self.log):
+                    if _ensure_ti_has_dag_version_id(current_ti, session, 
self.log):
                         request = TaskCallbackRequest(
-                            filepath=ti.dag_model.relative_fileloc or "",
+                            filepath=current_ti.dag_model.relative_fileloc or 
"",
                             bundle_name=_stuck_bundle_name,
                             bundle_version=_stuck_bundle_version,
-                            ti=ti,
+                            ti=current_ti,
                             msg=msg,
                             context_from_server=TIRunContext(
-                                dag_run=ti.dag_run,
-                                max_tries=ti.max_tries,
+                                dag_run=current_ti.dag_run,
+                                max_tries=current_ti.max_tries,
                                 variables=[],
                                 connections=[],
                                 xcom_keys_to_clear=[],
                             ),
                         )
                         executor.send_callback(request)
             finally:
-                ti.set_state(TaskInstanceState.FAILED, session=session)
-                executor.fail(ti.key)
+                current_ti.set_state(TaskInstanceState.FAILED, session=session)
+                executor.fail(current_ti.key)

Review Comment:
   Quick sanity check on the fail branch — pre-patch, 
`_handle_tasks_stuck_in_queued` called `executor.revoke_task(ti=ti)` 
unconditionally before `_maybe_requeue_stuck_ti`, so both the requeue path and 
the fail path went through revoke. After the move, the requeue branch still 
calls `executor.revoke_task(ti=current_ti)` at line 2566, but this `finally:` 
only does `executor.fail(current_ti.key)`.
   
   `executor.fail()` in the base executor just calls `change_state(key, FAILED, 
info)` — it pushes a FAILED event into the buffer but does not stop the worker. 
Compare `revoke_task` on CeleryExecutor 
([`celery_executor.py:393-401`](https://github.com/apache/airflow/blob/main/providers/celery/src/airflow/providers/celery/executors/celery_executor.py#L393-L401))
 which actually `celery_app.control.revoke(...)`s the running task and pops it 
from `workloads`/`running`/`queued_tasks`. KubernetesExecutor's `revoke_task` 
similarly patches+deletes the pod.
   
   Without revoke in the fail path, after the recovery skip-check passes (i.e. 
the TI really is still queued) and we exceed `num_stuck_queued_retries`, the 
worker keeps running and the scheduler marks the TI FAILED — same divergence 
the unconditional pre-patch revoke was protecting against. Was the intent to 
also call `executor.revoke_task(ti=current_ti)` here before the `set_state` + 
`fail`, or is there something I'm missing about the fail semantics?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to