ihorlukianov opened a new pull request, #68674:
URL: https://github.com/apache/airflow/pull/68674

   Airflow version observed 3.2.1
   
   When running a moderate load of mapped tasks (~100+ concurrent worker pods 
completing), the scheduler issues **thousands of redundant Kubernetes DELETE 
calls** for the same already-finished pods.
   The same pod name is deleted many times within seconds:
   ```
   2026-06-11T18:49:36.108968Z  Deleting pod trigger-test-dags-trigger-zwyr4icn 
...
   2026-06-11T18:49:36.228925Z  Deleting pod trigger-test-dags-trigger-zwyr4icn 
...
   2026-06-11T18:49:36.397320Z  Deleting pod trigger-test-dags-trigger-zwyr4icn 
...
   ... (166 total for this one pod)
   ```
   This starves the scheduler loop: with (in my case ~1,855) scheduled TIs 
waiting, most of each loop is spent re-deleting finished pods instead of 
launching new ones.
   
   ### Expected behaviour
   Each finished worker pod should be deleted **once** (or marked done once 
when `delete_worker_pods=False`). Subsequent scheduler loops should not 
re-issue delete calls for pods already processed.
   
   ### Reproduce
   1. Deploy Airflow 3.2.x with `KubernetesExecutor` and 
`delete_worker_pods=True`.
   2. Trigger DAGs with many mapped tasks (e.g. 100+ mapped `trigger` tasks) so 
pods complete in quick succession.
   3. Inspect scheduler logs:
   
      ```bash
      kubectl logs <scheduler-pod> -c scheduler | grep "Deleting pod" \
        | sed 's/.*Deleting pod \([^ ]*\) in.*/\1/' | sort | uniq -c | sort -rn 
| head
      ```
   4. Observe the same pod names with delete counts >> 1.
   
   ### RCA
   Introduced/regressed around **#55797** (Oct 2025), which added a 
`self.completed` set for orphaned completed pod adoption.
   In `KubernetesExecutor.sync()` 
(`providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py`):
   
   1. **`self.completed` is processed inside the `while True` loop** that 
drains `result_queue` — for every completion event, **all** entries in 
`self.completed` call `_change_state()` again, each triggering `delete_pod()`:
   
      ```python
      while True:
          results = self.result_queue.get_nowait()
          ...
          self._change_state(results)
   
          for result in self.completed:   # <-- nested inside while True
              self._change_state(result)
      ```
   
   2. **`self.completed` is never cleared** after processing, so entries 
accumulate and are re-processed on every subsequent completion event.
   
   Expected delete volume ≈ `num_result_queue_events × (1 + 
len(self.completed))`
   
   ### Proposed fix
   1. Move `for result in self.completed` **outside** the result-queue drain 
loop (once per `sync()`).
   2. **Clear** or discard from `self.completed` after successful processing.
   3. Deduplicate by `pod_name` when adopting completed pods.
   
   
   ### Expected impact
   Before: `delete_calls ≈ num_result_events × (1 + len(completed))` per 
`sync()`.
   After: `delete_calls ≈ num_result_events + len(completed)` per `sync()`, 
with `completed` cleared after processing.
   
   ---
   
   ##### Was generative AI tooling used to co-author this PR?
   
   <!--
   If generative AI tooling has been used in the process of authoring this PR, 
please
   change below checkbox to `[X]` followed by the name of the tool, uncomment 
the "Generated-by".
   -->
   
   - [x] Yes 
   Generated-by: [Cursor Composer ] following [the 
guidelines](https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#gen-ai-assisted-contributions)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to