1fanwang commented on code in PR #65970:
URL: https://github.com/apache/airflow/pull/65970#discussion_r3227334816
##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -2552,92 +2551,117 @@ def _maybe_requeue_stuck_ti(self, *, ti, session,
executor):
Otherwise, fail it.
"""
- num_times_stuck = self._get_num_times_stuck_in_queued(ti, session)
+ current_ti = self._get_ti_still_stuck_in_queued(ti=ti, session=session)
+ if current_ti is None:
+ self.log.debug(
+ "Task changed state before queued-timeout recovery; skipping.
task_instance=%s",
+ ti,
+ )
+ return
+
+ num_times_stuck = self._get_num_times_stuck_in_queued(current_ti,
session)
if num_times_stuck < self._num_stuck_queued_retries:
- self.log.info("Task stuck in queued; will try to requeue.
task_instance=%s", ti)
+ self.log.info("Task stuck in queued; will try to requeue.
task_instance=%s", current_ti)
+ executor.revoke_task(ti=current_ti)
session.add(
Log(
event=TASK_STUCK_IN_QUEUED_RESCHEDULE_EVENT,
- task_instance=ti.key,
+ task_instance=current_ti.key,
extra=(
f"Task was in queued state for longer than
{self._task_queued_timeout} "
"seconds; task state will be set back to scheduled."
),
)
)
- self._reschedule_stuck_task(ti, session=session)
+ self._reschedule_stuck_task(current_ti, session=session)
else:
self.log.info(
"Task requeue attempts exceeded max; marking failed.
task_instance=%s",
- ti,
+ current_ti,
)
msg = f"Task was requeued more than
{self._num_stuck_queued_retries} times and will be failed."
session.add(
Log(
event="stuck in queued tries exceeded",
- task_instance=ti.key,
+ task_instance=current_ti.key,
extra=msg,
)
)
try:
- dag =
self.scheduler_dag_bag.get_dag_for_run(dag_run=ti.dag_run, session=session)
- task = dag.get_task(ti.task_id)
+ dag =
self.scheduler_dag_bag.get_dag_for_run(dag_run=current_ti.dag_run,
session=session)
+ task = dag.get_task(current_ti.task_id)
except Exception:
self.log.warning(
"The DAG or task could not be found. If a failure callback
exists, it will not be run.",
exc_info=True,
)
else:
if task.has_on_failure_callback:
- if inspect(ti).detached:
- ti = session.merge(ti)
+ if inspect(current_ti).detached:
+ current_ti = session.merge(current_ti)
# Safely extract bundle info with fallback for legacy tasks
# (dag_version may be None after Airflow 2 → 3 migration).
_stuck_bundle_name = (
- ti.dag_version.bundle_name if ti.dag_version else
ti.dag_model.bundle_name
+ current_ti.dag_version.bundle_name
+ if current_ti.dag_version
+ else current_ti.dag_model.bundle_name
)
_stuck_bundle_version = (
- ti.dag_version.bundle_version if ti.dag_version else
ti.dag_run.bundle_version
+ current_ti.dag_version.bundle_version
+ if current_ti.dag_version
+ else current_ti.dag_run.bundle_version
)
# Backfill dag_version_id for legacy tasks (Pydantic
requires uuid.UUID).
# Note: we cannot use `continue` here because this method
is not
# inside a loop. If backfilling fails we simply skip the
callback.
- if _ensure_ti_has_dag_version_id(ti, session, self.log):
+ if _ensure_ti_has_dag_version_id(current_ti, session,
self.log):
request = TaskCallbackRequest(
- filepath=ti.dag_model.relative_fileloc or "",
+ filepath=current_ti.dag_model.relative_fileloc or
"",
bundle_name=_stuck_bundle_name,
bundle_version=_stuck_bundle_version,
- ti=ti,
+ ti=current_ti,
msg=msg,
context_from_server=TIRunContext(
- dag_run=ti.dag_run,
- max_tries=ti.max_tries,
+ dag_run=current_ti.dag_run,
+ max_tries=current_ti.max_tries,
variables=[],
connections=[],
xcom_keys_to_clear=[],
),
)
executor.send_callback(request)
finally:
- ti.set_state(TaskInstanceState.FAILED, session=session)
- executor.fail(ti.key)
+ current_ti.set_state(TaskInstanceState.FAILED, session=session)
+ executor.fail(current_ti.key)
Review Comment:
Quick sanity check on the fail branch — pre-patch,
`_handle_tasks_stuck_in_queued` called `executor.revoke_task(ti=ti)`
unconditionally before `_maybe_requeue_stuck_ti`, so both the requeue path and
the fail path went through revoke. After the move, the requeue branch still
calls `executor.revoke_task(ti=current_ti)` at line 2566, but this `finally:`
only does `executor.fail(current_ti.key)`.
`executor.fail()` in the base executor just calls `change_state(key, FAILED,
info)` — it pushes a FAILED event into the buffer but does not stop the worker.
Compare `revoke_task` on CeleryExecutor
([`celery_executor.py:393-401`](https://github.com/apache/airflow/blob/main/providers/celery/src/airflow/providers/celery/executors/celery_executor.py#L393-L401))
which actually `celery_app.control.revoke(...)`s the running task and pops it
from `workloads`/`running`/`queued_tasks`. KubernetesExecutor's `revoke_task`
similarly patches+deletes the pod.
Without revoke in the fail path, after the recovery skip-check passes (i.e.
the TI really is still queued) and we exceed `num_stuck_queued_retries`, the
worker keeps running and the scheduler marks the TI FAILED — same divergence
the unconditional pre-patch revoke was protecting against. Was the intent to
also call `executor.revoke_task(ti=current_ti)` here before the `set_state` +
`fail`, or is there something I'm missing about the fail semantics?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]