paultmathew opened a new pull request, #67297:
URL: https://github.com/apache/airflow/pull/67297

   Fix a key-shape mismatch between the execution API's ``/states`` endpoint and
   ``KubernetesPodTrigger.get_task_state``: the endpoint suffixes the response
   key with ``_{map_index}`` for mapped TIs, but the trigger looked the value
   up by plain ``task_id``, so the lookup ``KeyError``-ed for every mapped
   deferrable ``KubernetesPodOperator`` task. ``cleanup()``'s broad
   ``except Exception`` swallowed the error and defensively skipped
   ``hook.delete_pod()`` -- leaving Mark Failed in the UI useless on mapped
   deferrable KPO tasks. The pod stayed ``Running`` until
   ``active_deadline_seconds`` expired (often hours).
   
   For continuous deferrable pollers expanded via ``.expand(...)`` this also
   caused overlapping-writer races against external systems on the next
   schedule, because the failed run's pod was still alive when the next run
   spawned its own pod.
   
   This PR composes the lookup key with the ``_{map_index}`` suffix when the
   TI is mapped, matching how the API serialises the response.
   
   closes: #67296
   
   ## Test plan
   
   ### Unit tests
   
   Three new tests in 
``providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_pod.py``:
   
   - ``test_get_task_state_uses_task_id_for_non_mapped_ti`` -- regression guard 
for the non-mapped branch (response keyed by plain ``task_id``).
   - ``test_get_task_state_uses_composite_key_for_mapped_ti`` -- the bug repro: 
mapped TI with ``task_id=\"map_group.task_a\"`` + ``map_index=2`` returns the 
state stored under key ``\"map_group.task_a_2\"``.
   - ``test_get_task_state_raises_when_mapped_key_missing`` -- pins the wrapped 
``AirflowException`` shape so callers (e.g. ``safe_to_cancel``'s ``except 
Exception``) keep matching.
   
   Local run:
   
   \`\`\`
   uv run --no-sync --project providers/cncf/kubernetes pytest \\
     providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_pod.py 
\\
     -k \"get_task_state or safe_to_cancel\"
   \`\`\`
   
   Result: ``5 passed, 2 skipped, 1 warning in 2.40s`` (2 skips are legacy 
Airflow < 3.3 paths unrelated to this PR).
   
   ### End-to-end smoke test on an EKS sandbox cluster
   
   Airflow 3.2.1 with ``apache-airflow-providers-cncf-kubernetes==10.16.0``, 
deferrable KPO tasks inside a mapped ``@task_group.expand(...)`` (the same 
shape as the issue's repro DAG).
   
   **Before this PR** (provider 10.16.0 unpatched) -- one of three TI logs 
after Mark Failed:
   
   \`\`\`
   [17:46:25.889914Z] WARNING - Could not determine task state during cleanup;
   skipping pod deletion to be safe.
   AirflowException: ('TaskInstance with dag_id: %s, ...', '...',
                      'map_group.task_a', '...', 2)
   ,KeyError: 'map_group.task_a'
       File pod.py, line 399 in get_task_state
   \`\`\`
   
   Pods stayed ``Running`` for the full 600s sleep; nothing in the kubelet 
event stream until ``active_deadline_seconds`` (would have been 7200s here).
   
   **After this PR** -- same DAG, same Mark Failed:
   
   \`\`\`
   18:17:54  pod:      starting task_a-0 (duration=600s exit_code=0)  (x3)
   18:20:07  kubelet:  Killing: Stopping container base               (x3)
   18:20:07  pod:      received SIGTERM                               (x3)
   18:20:07  runtime:  Task ... deleted with exit code 0              (x3)
   \`\`\`
   
   End-to-end ``Mark Failed`` -> ``delete_pod`` -> kubelet ``SIGTERM`` -> 
pod-side
   trap -> container ``exit 0`` -> kubelet cleanup completes in **~1 second** 
for
   all three mapped TIs. The previous ``Could not determine task state`` warning
   no longer fires.
   
   ### Static checks
   
   - ``prek run ruff --files <changed>``: pass
   - ``prek run ruff-format --files <changed>``: pass
   
   ---
   
   ##### Was generative AI tooling used to co-author this PR?
   
   - [X] Yes -- Cursor / Claude Opus 4.7
   
   Generated-by: Cursor / Claude Opus 4.7 following [the 
guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#gen-ai-assisted-contributions)
   
   Made with [Cursor](https://cursor.com)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to