paultmathew opened a new pull request, #67297:
URL: https://github.com/apache/airflow/pull/67297
Fix a key-shape mismatch between the execution API's ``/states`` endpoint and
``KubernetesPodTrigger.get_task_state``: the endpoint suffixes the response
key with ``_{map_index}`` for mapped TIs, but the trigger looked the value
up by plain ``task_id``, so the lookup ``KeyError``-ed for every mapped
deferrable ``KubernetesPodOperator`` task. ``cleanup()``'s broad
``except Exception`` swallowed the error and defensively skipped
``hook.delete_pod()`` -- leaving Mark Failed in the UI useless on mapped
deferrable KPO tasks. The pod stayed ``Running`` until
``active_deadline_seconds`` expired (often hours).
For continuous deferrable pollers expanded via ``.expand(...)`` this also
caused overlapping-writer races against external systems on the next
schedule, because the failed run's pod was still alive when the next run
spawned its own pod.
This PR composes the lookup key with the ``_{map_index}`` suffix when the
TI is mapped, matching how the API serialises the response.
closes: #67296
## Test plan
### Unit tests
Three new tests in
``providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_pod.py``:
- ``test_get_task_state_uses_task_id_for_non_mapped_ti`` -- regression guard
for the non-mapped branch (response keyed by plain ``task_id``).
- ``test_get_task_state_uses_composite_key_for_mapped_ti`` -- the bug repro:
mapped TI with ``task_id=\"map_group.task_a\"`` + ``map_index=2`` returns the
state stored under key ``\"map_group.task_a_2\"``.
- ``test_get_task_state_raises_when_mapped_key_missing`` -- pins the wrapped
``AirflowException`` shape so callers (e.g. ``safe_to_cancel``'s ``except
Exception``) keep matching.
Local run:
\`\`\`
uv run --no-sync --project providers/cncf/kubernetes pytest \\
providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_pod.py
\\
-k \"get_task_state or safe_to_cancel\"
\`\`\`
Result: ``5 passed, 2 skipped, 1 warning in 2.40s`` (2 skips are legacy
Airflow < 3.3 paths unrelated to this PR).
### End-to-end smoke test on an EKS sandbox cluster
Airflow 3.2.1 with ``apache-airflow-providers-cncf-kubernetes==10.16.0``,
deferrable KPO tasks inside a mapped ``@task_group.expand(...)`` (the same
shape as the issue's repro DAG).
**Before this PR** (provider 10.16.0 unpatched) -- one of three TI logs
after Mark Failed:
\`\`\`
[17:46:25.889914Z] WARNING - Could not determine task state during cleanup;
skipping pod deletion to be safe.
AirflowException: ('TaskInstance with dag_id: %s, ...', '...',
'map_group.task_a', '...', 2)
,KeyError: 'map_group.task_a'
File pod.py, line 399 in get_task_state
\`\`\`
Pods stayed ``Running`` for the full 600s sleep; nothing in the kubelet
event stream until ``active_deadline_seconds`` (would have been 7200s here).
**After this PR** -- same DAG, same Mark Failed:
\`\`\`
18:17:54 pod: starting task_a-0 (duration=600s exit_code=0) (x3)
18:20:07 kubelet: Killing: Stopping container base (x3)
18:20:07 pod: received SIGTERM (x3)
18:20:07 runtime: Task ... deleted with exit code 0 (x3)
\`\`\`
End-to-end ``Mark Failed`` -> ``delete_pod`` -> kubelet ``SIGTERM`` ->
pod-side
trap -> container ``exit 0`` -> kubelet cleanup completes in **~1 second**
for
all three mapped TIs. The previous ``Could not determine task state`` warning
no longer fires.
### Static checks
- ``prek run ruff --files <changed>``: pass
- ``prek run ruff-format --files <changed>``: pass
---
##### Was generative AI tooling used to co-author this PR?
- [X] Yes -- Cursor / Claude Opus 4.7
Generated-by: Cursor / Claude Opus 4.7 following [the
guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#gen-ai-assisted-contributions)
Made with [Cursor](https://cursor.com)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]