alixirs opened a new issue, #68683:
URL: https://github.com/apache/airflow/issues/68683

   ### Apache Airflow Provider(s)
   
   cncf-kubernetes
   
   ### Versions of Apache Airflow Providers
   
   `apache-airflow-providers-cncf-kubernetes==10.18.0` (latest at time of 
filing).
   
   The flaw is present in the current code on `main` as well. Relevant history:
   - `self.completed` set + the `for result in self.completed: 
self._change_state(result)` loop were introduced in **10.10.0**.
   - The **periodic** `_adopt_completed_pods()` call from `sync()` (which 
regularly fills the set) was added in **10.15.0** (#61839).
   
   ### Apache Airflow version
   
   Observed on **2.11.0**. The affected code is provider code 
(`KubernetesExecutor`), identical on `main`, so 3.x is equally affected.
   
   ### Operating System
   
   N/A (Python code path). Observed on Azure AKS.
   
   ### Deployment
   
   Other 3rd-party Helm chart
   
   ### Deployment details
   
   Kubernetes, `KubernetesExecutor`, `delete_worker_pods=False`. Environment 
has completed (Succeeded) pods carrying an `airflow-worker` label different 
from the current scheduler's job id (multi-scheduler HA and/or scheduler 
restarts).
   
   ### What happened
   
   On a busy scheduler, `patch_pod_executor_done()` is called repeatedly 
against the **same** already-completed pods, every scheduler loop. In a 
~96-second window we saw:
   
   - `patch_pod_executor_done` (`"Patching pod ... to mark it as done"`): 
**525** calls
   - `_change_state` patch log (`"Patched pod ... to mark it as done"`): **525**
   - `sync()` `"Changing state of ..."` (logged once per real result): **30**
   - `"Attempting to finish pod"` / `"finishing job"` (watcher → result_queue): 
**30** / **30**
   - `"Deleted pod ..."`: **0** (confirms `delete_worker_pods=False`)
   - Distinct pods patched: **32**; the busiest pods were patched **exactly 30 
times each**.
   
   `525` patches with only `30` `"Changing state of"` lines is impossible if 
`_change_state` were reached only via the result-queue (every result-queue item 
logs `"Changing state of"` first). The extra ~495 calls come from the un-logged 
`self.completed` loop.
   
   ### Root cause
   
   `KubernetesExecutor.sync()` re-runs `_change_state()` over the entire 
`self.completed` set, and **nothing ever removes entries from that set**.
   
   
`providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py`:
   ```python
       def sync(self) -> None:
           ...
           # (periodic) populate self.completed from completed pods owned by 
other/dead schedulers
           if now - self._last_completed_pod_adoption >= adoption_interval:   # 
L273
               self._last_completed_pod_adoption = now
               self._adopt_completed_pods(self.kube_client)                   # 
L275
   
           ...
           with contextlib.suppress(Empty):
               while True:
                   results = self.result_queue.get_nowait()
                   try:
                       ...
                       self.log.info("Changing state of %s to %s", results, 
results.state)  # L289
                       self._change_state(results)                              
              # L291
                   finally:
                       self.result_queue.task_done()
   
                   for result in self.completed:        # L303  <-- inside the 
while loop
                       self._change_state(result)        # L304  <-- never 
logged, never drained
   ```
   
   `_adopt_completed_pods()` adds to the set but it is never cleared/discarded:
   ```python
               ti_id = annotations_to_key(pod.metadata.annotations)
               self.completed.add(                                   # L816
                   KubernetesResults(key=ti_id, state="completed", ...)
               )
   ```
   
   `_change_state()` for these synthetic `state="completed"` entries patches 
the pod, then `self.running.remove(key)` raises `KeyError` (they were never in 
`running`) and the method returns early — so the entry is never removed from 
`self.completed`:
   ```python
           else:
               self.kube_scheduler.patch_pod_executor_done(pod_name=pod_name, 
namespace=namespace)  # L482
               self.log.info("Patched pod %s in namespace %s to mark it as 
done", key, namespace)   # L483
           try:
               self.running.remove(key)                              # L486
           except KeyError:
               self.log.debug("TI key not in running, not adding to 
event_buffer: %s", key)
               return                                                # L489  
<-- no removal from self.completed
   ```
   
   Two compounding defects:
   1. **`self.completed` is never drained.** It is only ever declared (L117), 
iterated (L303), and added to (L816) — no `.clear()`, `.discard()`, 
`.remove()`, or reassignment anywhere (verified in 10.18.0 and on `main`). So 
every adopted completed pod is re-PATCHed forever, and the set grows 
monotonically over the scheduler's lifetime.
   2. **The loop is nested inside the result-queue `while True`.** It runs once 
**per dequeued result**, not once per `sync()`. PATCH volume ≈ `(#results 
processed) × |self.completed|` (the `30 passes × ~16 pods ≈ ~500` we observed).
   
   Permalinks (10.18.0), base 
`https://github.com/apache/airflow/blob/providers-cncf-kubernetes/10.18.0/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py`:
   - Declaration: `#L117`
   - Loop: `#L303-L304`
   - Add: `#L816`
   
   ### What you think should happen instead
   
   Adopted completed pods should be processed **once** and then dropped:
   - Move the `for result in self.completed` loop **out** of the result-queue 
`while True` so it runs once per `sync()`.
   - Drain the set after processing — `self.completed.clear()` (or `discard` 
each entry, conservatively only after a successful patch/delete, since 
`patch_pod_executor_done` swallows `ApiException`).
   
   Sketch:
   ```python
           with contextlib.suppress(Empty):
               while True:
                   results = self.result_queue.get_nowait()
                   try:
                       ...
                       self._change_state(results)
                   finally:
                       self.result_queue.task_done()
   
           # Process adopted completed pods once per sync, then drain.
           if self.completed:
               for result in list(self.completed):
                   self._change_state(result)
               self.completed.clear()
   ```
   
   ### How to reproduce
   
   1. `KubernetesExecutor`, `delete_worker_pods=False`, provider `>= 10.15.0`.
   2. Produce Succeeded pods whose `airflow-worker` label differs from the 
current scheduler's job id — e.g. restart the scheduler (new job id) while 
Succeeded pods from the previous incarnation remain, or run ≥2 schedulers.
   3. Wait one `[scheduler] orphaned_tasks_check_interval` tick (default 300s) 
for `_adopt_completed_pods` to populate `self.completed`.
   4. Observe: on every subsequent `sync()` that processes any watcher result, 
every pod in `self.completed` is re-PATCHed (`"Patching pod ... to mark it as 
done"`), and the set never shrinks until the scheduler restarts.
   
   Quick check against scheduler logs:
   ```bash
   grep -c "Patching pod" scheduler.log            # high
   grep -c "Changing state of" scheduler.log       # much lower
   grep "Patching pod" scheduler.log \
     | sed -E 's/.*Patching pod ([a-z0-9-]+) in namespace.*/\1/' \
     | sort | uniq -c | sort -rn | head            # same pods, patched many 
times each
   ```
   
   ### Anything else
   
   - Impact: sustained `PATCH /api/v1/namespaces/.../pods/...` load (multiplied 
by the never-draining set), compounded by `_list_pods` cost at scale (#35599), 
longer scheduler loops, and tasks stalling in `scheduled`/`queued` under churn 
— same symptom class as #66396.
   - `delete_worker_pods=True` only changes the verb (repeated `DELETE` 404s 
instead of `PATCH`); it does not drain the set.
   - Workaround until fixed: raise `[scheduler] orphaned_tasks_check_interval` 
(e.g. `86400`) to stop repopulating the set (does not drain an 
already-populated set — requires a scheduler restart), and/or reduce scheduler 
restarts.
   - Lineage / related (all closed/merged; none drain `self.completed`): 
#57553, #61839 (10.15.0), #66396 / #66400 (10.17.1), #67891 / #67850 (10.18.0), 
#35599.
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to