BALOGUN-DAVID commented on code in PR #65970:
URL: https://github.com/apache/airflow/pull/65970#discussion_r3229883904


##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -2552,92 +2551,117 @@ def _maybe_requeue_stuck_ti(self, *, ti, session, 
executor):
 
         Otherwise, fail it.
         """
-        num_times_stuck = self._get_num_times_stuck_in_queued(ti, session)
+        current_ti = self._get_ti_still_stuck_in_queued(ti=ti, session=session)
+        if current_ti is None:
+            self.log.debug(
+                "Task changed state before queued-timeout recovery; skipping. 
task_instance=%s",
+                ti,
+            )
+            return
+
+        num_times_stuck = self._get_num_times_stuck_in_queued(current_ti, 
session)
         if num_times_stuck < self._num_stuck_queued_retries:
-            self.log.info("Task stuck in queued; will try to requeue. 
task_instance=%s", ti)
+            self.log.info("Task stuck in queued; will try to requeue. 
task_instance=%s", current_ti)
+            executor.revoke_task(ti=current_ti)
             session.add(
                 Log(
                     event=TASK_STUCK_IN_QUEUED_RESCHEDULE_EVENT,
-                    task_instance=ti.key,
+                    task_instance=current_ti.key,
                     extra=(
                         f"Task was in queued state for longer than 
{self._task_queued_timeout} "
                         "seconds; task state will be set back to scheduled."
                     ),
                 )
             )
-            self._reschedule_stuck_task(ti, session=session)
+            self._reschedule_stuck_task(current_ti, session=session)
         else:
             self.log.info(
                 "Task requeue attempts exceeded max; marking failed. 
task_instance=%s",
-                ti,
+                current_ti,
             )
             msg = f"Task was requeued more than 
{self._num_stuck_queued_retries} times and will be failed."
             session.add(
                 Log(
                     event="stuck in queued tries exceeded",
-                    task_instance=ti.key,
+                    task_instance=current_ti.key,
                     extra=msg,
                 )
             )
 
             try:
-                dag = 
self.scheduler_dag_bag.get_dag_for_run(dag_run=ti.dag_run, session=session)
-                task = dag.get_task(ti.task_id)
+                dag = 
self.scheduler_dag_bag.get_dag_for_run(dag_run=current_ti.dag_run, 
session=session)
+                task = dag.get_task(current_ti.task_id)
             except Exception:
                 self.log.warning(
                     "The DAG or task could not be found. If a failure callback 
exists, it will not be run.",
                     exc_info=True,
                 )
             else:
                 if task.has_on_failure_callback:
-                    if inspect(ti).detached:
-                        ti = session.merge(ti)
+                    if inspect(current_ti).detached:
+                        current_ti = session.merge(current_ti)
                     # Safely extract bundle info with fallback for legacy tasks
                     # (dag_version may be None after Airflow 2 → 3 migration).
                     _stuck_bundle_name = (
-                        ti.dag_version.bundle_name if ti.dag_version else 
ti.dag_model.bundle_name
+                        current_ti.dag_version.bundle_name
+                        if current_ti.dag_version
+                        else current_ti.dag_model.bundle_name
                     )
                     _stuck_bundle_version = (
-                        ti.dag_version.bundle_version if ti.dag_version else 
ti.dag_run.bundle_version
+                        current_ti.dag_version.bundle_version
+                        if current_ti.dag_version
+                        else current_ti.dag_run.bundle_version
                     )
                     # Backfill dag_version_id for legacy tasks (Pydantic 
requires uuid.UUID).
                     # Note: we cannot use `continue` here because this method 
is not
                     # inside a loop.  If backfilling fails we simply skip the 
callback.
-                    if _ensure_ti_has_dag_version_id(ti, session, self.log):
+                    if _ensure_ti_has_dag_version_id(current_ti, session, 
self.log):
                         request = TaskCallbackRequest(
-                            filepath=ti.dag_model.relative_fileloc or "",
+                            filepath=current_ti.dag_model.relative_fileloc or 
"",
                             bundle_name=_stuck_bundle_name,
                             bundle_version=_stuck_bundle_version,
-                            ti=ti,
+                            ti=current_ti,
                             msg=msg,
                             context_from_server=TIRunContext(
-                                dag_run=ti.dag_run,
-                                max_tries=ti.max_tries,
+                                dag_run=current_ti.dag_run,
+                                max_tries=current_ti.max_tries,
                                 variables=[],
                                 connections=[],
                                 xcom_keys_to_clear=[],
                             ),
                         )
                         executor.send_callback(request)
             finally:
-                ti.set_state(TaskInstanceState.FAILED, session=session)
-                executor.fail(ti.key)
+                current_ti.set_state(TaskInstanceState.FAILED, session=session)
+                executor.fail(current_ti.key)

Review Comment:
   I’ve updated the tries-exceeded fail path to also call 
`executor.revoke_task(ti=current_ti)` after the stale-state check passes, 
before marking the TI failed, and added a regression test to cover that path.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to