alixirs opened a new issue, #68683:
URL: https://github.com/apache/airflow/issues/68683
### Apache Airflow Provider(s)
cncf-kubernetes
### Versions of Apache Airflow Providers
`apache-airflow-providers-cncf-kubernetes==10.18.0` (latest at time of
filing).
The flaw is present in the current code on `main` as well. Relevant history:
- `self.completed` set + the `for result in self.completed:
self._change_state(result)` loop were introduced in **10.10.0**.
- The **periodic** `_adopt_completed_pods()` call from `sync()` (which
regularly fills the set) was added in **10.15.0** (#61839).
### Apache Airflow version
Observed on **2.11.0**. The affected code is provider code
(`KubernetesExecutor`), identical on `main`, so 3.x is equally affected.
### Operating System
N/A (Python code path). Observed on Azure AKS.
### Deployment
Other 3rd-party Helm chart
### Deployment details
Kubernetes, `KubernetesExecutor`, `delete_worker_pods=False`. Environment
has completed (Succeeded) pods carrying an `airflow-worker` label different
from the current scheduler's job id (multi-scheduler HA and/or scheduler
restarts).
### What happened
On a busy scheduler, `patch_pod_executor_done()` is called repeatedly
against the **same** already-completed pods, every scheduler loop. In a
~96-second window we saw:
- `patch_pod_executor_done` (`"Patching pod ... to mark it as done"`):
**525** calls
- `_change_state` patch log (`"Patched pod ... to mark it as done"`): **525**
- `sync()` `"Changing state of ..."` (logged once per real result): **30**
- `"Attempting to finish pod"` / `"finishing job"` (watcher → result_queue):
**30** / **30**
- `"Deleted pod ..."`: **0** (confirms `delete_worker_pods=False`)
- Distinct pods patched: **32**; the busiest pods were patched **exactly 30
times each**.
`525` patches with only `30` `"Changing state of"` lines is impossible if
`_change_state` were reached only via the result-queue (every result-queue item
logs `"Changing state of"` first). The extra ~495 calls come from the un-logged
`self.completed` loop.
### Root cause
`KubernetesExecutor.sync()` re-runs `_change_state()` over the entire
`self.completed` set, and **nothing ever removes entries from that set**.
`providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py`:
```python
def sync(self) -> None:
...
# (periodic) populate self.completed from completed pods owned by
other/dead schedulers
if now - self._last_completed_pod_adoption >= adoption_interval: #
L273
self._last_completed_pod_adoption = now
self._adopt_completed_pods(self.kube_client) #
L275
...
with contextlib.suppress(Empty):
while True:
results = self.result_queue.get_nowait()
try:
...
self.log.info("Changing state of %s to %s", results,
results.state) # L289
self._change_state(results)
# L291
finally:
self.result_queue.task_done()
for result in self.completed: # L303 <-- inside the
while loop
self._change_state(result) # L304 <-- never
logged, never drained
```
`_adopt_completed_pods()` adds to the set but it is never cleared/discarded:
```python
ti_id = annotations_to_key(pod.metadata.annotations)
self.completed.add( # L816
KubernetesResults(key=ti_id, state="completed", ...)
)
```
`_change_state()` for these synthetic `state="completed"` entries patches
the pod, then `self.running.remove(key)` raises `KeyError` (they were never in
`running`) and the method returns early — so the entry is never removed from
`self.completed`:
```python
else:
self.kube_scheduler.patch_pod_executor_done(pod_name=pod_name,
namespace=namespace) # L482
self.log.info("Patched pod %s in namespace %s to mark it as
done", key, namespace) # L483
try:
self.running.remove(key) # L486
except KeyError:
self.log.debug("TI key not in running, not adding to
event_buffer: %s", key)
return # L489
<-- no removal from self.completed
```
Two compounding defects:
1. **`self.completed` is never drained.** It is only ever declared (L117),
iterated (L303), and added to (L816) — no `.clear()`, `.discard()`,
`.remove()`, or reassignment anywhere (verified in 10.18.0 and on `main`). So
every adopted completed pod is re-PATCHed forever, and the set grows
monotonically over the scheduler's lifetime.
2. **The loop is nested inside the result-queue `while True`.** It runs once
**per dequeued result**, not once per `sync()`. PATCH volume ≈ `(#results
processed) × |self.completed|` (the `30 passes × ~16 pods ≈ ~500` we observed).
Permalinks (10.18.0), base
`https://github.com/apache/airflow/blob/providers-cncf-kubernetes/10.18.0/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py`:
- Declaration: `#L117`
- Loop: `#L303-L304`
- Add: `#L816`
### What you think should happen instead
Adopted completed pods should be processed **once** and then dropped:
- Move the `for result in self.completed` loop **out** of the result-queue
`while True` so it runs once per `sync()`.
- Drain the set after processing — `self.completed.clear()` (or `discard`
each entry, conservatively only after a successful patch/delete, since
`patch_pod_executor_done` swallows `ApiException`).
Sketch:
```python
with contextlib.suppress(Empty):
while True:
results = self.result_queue.get_nowait()
try:
...
self._change_state(results)
finally:
self.result_queue.task_done()
# Process adopted completed pods once per sync, then drain.
if self.completed:
for result in list(self.completed):
self._change_state(result)
self.completed.clear()
```
### How to reproduce
1. `KubernetesExecutor`, `delete_worker_pods=False`, provider `>= 10.15.0`.
2. Produce Succeeded pods whose `airflow-worker` label differs from the
current scheduler's job id — e.g. restart the scheduler (new job id) while
Succeeded pods from the previous incarnation remain, or run ≥2 schedulers.
3. Wait one `[scheduler] orphaned_tasks_check_interval` tick (default 300s)
for `_adopt_completed_pods` to populate `self.completed`.
4. Observe: on every subsequent `sync()` that processes any watcher result,
every pod in `self.completed` is re-PATCHed (`"Patching pod ... to mark it as
done"`), and the set never shrinks until the scheduler restarts.
Quick check against scheduler logs:
```bash
grep -c "Patching pod" scheduler.log # high
grep -c "Changing state of" scheduler.log # much lower
grep "Patching pod" scheduler.log \
| sed -E 's/.*Patching pod ([a-z0-9-]+) in namespace.*/\1/' \
| sort | uniq -c | sort -rn | head # same pods, patched many
times each
```
### Anything else
- Impact: sustained `PATCH /api/v1/namespaces/.../pods/...` load (multiplied
by the never-draining set), compounded by `_list_pods` cost at scale (#35599),
longer scheduler loops, and tasks stalling in `scheduled`/`queued` under churn
— same symptom class as #66396.
- `delete_worker_pods=True` only changes the verb (repeated `DELETE` 404s
instead of `PATCH`); it does not drain the set.
- Workaround until fixed: raise `[scheduler] orphaned_tasks_check_interval`
(e.g. `86400`) to stop repopulating the set (does not drain an
already-populated set — requires a scheduler restart), and/or reduce scheduler
restarts.
- Lineage / related (all closed/merged; none drain `self.completed`):
#57553, #61839 (10.15.0), #66396 / #66400 (10.17.1), #67891 / #67850 (10.18.0),
#35599.
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [x] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]