ihorlukianov opened a new pull request, #68674:
URL: https://github.com/apache/airflow/pull/68674
Airflow version observed 3.2.1
When running a moderate load of mapped tasks (~100+ concurrent worker pods
completing), the scheduler issues **thousands of redundant Kubernetes DELETE
calls** for the same already-finished pods.
The same pod name is deleted many times within seconds:
```
2026-06-11T18:49:36.108968Z Deleting pod trigger-test-dags-trigger-zwyr4icn
...
2026-06-11T18:49:36.228925Z Deleting pod trigger-test-dags-trigger-zwyr4icn
...
2026-06-11T18:49:36.397320Z Deleting pod trigger-test-dags-trigger-zwyr4icn
...
... (166 total for this one pod)
```
This starves the scheduler loop: with (in my case ~1,855) scheduled TIs
waiting, most of each loop is spent re-deleting finished pods instead of
launching new ones.
### Expected behaviour
Each finished worker pod should be deleted **once** (or marked done once
when `delete_worker_pods=False`). Subsequent scheduler loops should not
re-issue delete calls for pods already processed.
### Reproduce
1. Deploy Airflow 3.2.x with `KubernetesExecutor` and
`delete_worker_pods=True`.
2. Trigger DAGs with many mapped tasks (e.g. 100+ mapped `trigger` tasks) so
pods complete in quick succession.
3. Inspect scheduler logs:
```bash
kubectl logs <scheduler-pod> -c scheduler | grep "Deleting pod" \
| sed 's/.*Deleting pod \([^ ]*\) in.*/\1/' | sort | uniq -c | sort -rn
| head
```
4. Observe the same pod names with delete counts >> 1.
### RCA
Introduced/regressed around **#55797** (Oct 2025), which added a
`self.completed` set for orphaned completed pod adoption.
In `KubernetesExecutor.sync()`
(`providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py`):
1. **`self.completed` is processed inside the `while True` loop** that
drains `result_queue` — for every completion event, **all** entries in
`self.completed` call `_change_state()` again, each triggering `delete_pod()`:
```python
while True:
results = self.result_queue.get_nowait()
...
self._change_state(results)
for result in self.completed: # <-- nested inside while True
self._change_state(result)
```
2. **`self.completed` is never cleared** after processing, so entries
accumulate and are re-processed on every subsequent completion event.
Expected delete volume ≈ `num_result_queue_events × (1 +
len(self.completed))`
### Proposed fix
1. Move `for result in self.completed` **outside** the result-queue drain
loop (once per `sync()`).
2. **Clear** or discard from `self.completed` after successful processing.
3. Deduplicate by `pod_name` when adopting completed pods.
### Expected impact
Before: `delete_calls ≈ num_result_events × (1 + len(completed))` per
`sync()`.
After: `delete_calls ≈ num_result_events + len(completed)` per `sync()`,
with `completed` cleared after processing.
---
##### Was generative AI tooling used to co-author this PR?
<!--
If generative AI tooling has been used in the process of authoring this PR,
please
change below checkbox to `[X]` followed by the name of the tool, uncomment
the "Generated-by".
-->
- [x] Yes
Generated-by: [Cursor Composer ] following [the
guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#gen-ai-assisted-contributions)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]