This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 84642755ef9 KubernetesExecutor: scope periodic completed-pod adoption
to dead schedulers (#66400)
84642755ef9 is described below
commit 84642755ef9b3d68e6d7b6a4af4554a21e19992c
Author: Jarek Potiuk <[email protected]>
AuthorDate: Tue May 5 22:18:59 2026 +0200
KubernetesExecutor: scope periodic completed-pod adoption to dead
schedulers (#66400)
* KubernetesExecutor: scope periodic completed-pod adoption to dead
schedulers
PR #61839 (cncf-kubernetes 10.15.0) added a periodic call to
`_adopt_completed_pods` from inside `KubernetesExecutor.sync()`, gated by
`[scheduler] orphaned_tasks_check_interval` (default 300 s). The query
selects every Succeeded pod whose `airflow-worker` label is not the current
scheduler's label and PATCHes it with the current scheduler's label so its
KubernetesJobWatcher will see the change and DELETE the pod.
With multi-scheduler deployments that caused thrashing — every
`orphaned_tasks_check_interval` each scheduler iterated over every Succeeded
pod that did not carry its own label and PATCHed it. Schedulers fought each
other:
* Scheduler A relabels every Succeeded pod owned by B and C → A's watcher
DELETEs them.
* Scheduler B does the same a few seconds later → relabels A's freshly
patched pods to B → B's watcher takes over.
* Scheduler C the same.
At steady state with high pod churn this manifested as heavy
PATCH /api/v1/namespaces/.../pods/... traffic, expensive `_list_pods` calls
on every interval tick (#35599 already documents this is 15-30 s with 500
pods), and tasks stalling in `scheduled` / `queued` because every scheduler
loop was burning seconds inside `_list_pods` and `patch_namespaced_pod`
instead of doing useful scheduling. Setting `delete_worker_pods=False` did
NOT help — the periodic adoption code path doesn't gate on it; it goes
through the watcher's delete.
Fix: scope the periodic adoption to pods owned by no-longer-alive
schedulers. New helper `_alive_other_scheduler_job_ids` queries the
`Job` table for SchedulerJobs whose `state == RUNNING` and whose
`latest_heartbeat` is within `[scheduler] scheduler_health_check_threshold`
(matching the alive-scheduler definition already used by
`SchedulerJobRunner.adopt_or_reset_orphaned_tasks`). The label selector
in `_adopt_completed_pods` is then built to exclude self + every alive
sibling using K8s set-based syntax `airflow-worker notin (a,b,c)`:
* Single-scheduler deployment: no behavior change. Helper returns empty
set, selector falls back to the original equality form
`airflow-worker!=<self_label>`.
* Multi-scheduler deployment: each scheduler only adopts pods whose
owning scheduler is gone — preserving the original goal of #61839
(cleanup after a scheduler restart) without the thrash.
If the DB query fails, the helper returns an empty set so the caller
falls back to the pre-#61839 "exclude self only" selector — a transient
DB issue must not break completed-pod cleanup.
Two new unit tests cover the multi-scheduler set-based selector and
confirm the single-scheduler equality form is unchanged. Existing
`test_adopt_completed_pods` and `test_adopt_completed_pods_api_exception`
keep their original assertions because the new helper falls back to an
empty set when `scheduler_job_id` is the test's non-numeric string.
Closes: #66396
* Iterate sibling-scheduler query result instead of materializing list
Address @jscheffl's first review point on #66400: previously the
`_alive_other_scheduler_job_ids` helper called `.all()` to materialize
the SQLAlchemy scalar result into a `list[int]` before passing it to
`set(...)`. Switch to a set-comprehension over the scalar cursor so
no intermediate list is built.
The functional behavior is identical; this just keeps the in-flight
memory footprint flat regardless of how many sibling schedulers are
alive at the moment of the query. In practice the query returns
`int`s of a tens-of-bytes-per-row class for a handful of rows in any
realistic Airflow HA deployment, so the saving is small — but the
cleaner pattern matches the review feedback exactly.
* Update
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
Co-authored-by: Jens Scheffler <[email protected]>
---------
Co-authored-by: Jens Scheffler <[email protected]>
---
.../kubernetes/executors/kubernetes_executor.py | 106 +++++++++++++++++++--
.../executors/test_kubernetes_executor.py | 73 ++++++++++++++
2 files changed, 172 insertions(+), 7 deletions(-)
diff --git
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
index 8320c566e0b..d109ac097dd 100644
---
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
+++
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
@@ -681,22 +681,114 @@ class KubernetesExecutor(BaseExecutor):
del tis_to_flush_by_key[ti_key]
self.running.add(ti_key)
+ def _alive_other_scheduler_job_ids(self) -> set[int]:
+ """
+ Return job IDs of every SchedulerJob that is currently alive —
excluding self.
+
+ "Alive" means ``Job.state == RUNNING`` AND its ``latest_heartbeat`` is
+ within ``[scheduler] scheduler_health_check_threshold``.
+
+ Used by ``_adopt_completed_pods`` to scope cross-scheduler pod
+ adoption to pods owned by no-longer-alive schedulers (#66396).
+ With a single scheduler the returned set is always empty — the
+ original "exclude self only" behavior is preserved. With multiple
+ schedulers each one only adopts pods whose owning scheduler is gone,
+ eliminating the relabel-thrash that PR #61839 introduced.
+
+ Returns an empty set on any DB error so the caller falls back to
+ the pre-#61839 "exclude self only" selector — a transient DB issue
+ must not break completed-pod cleanup.
+ """
+ if TYPE_CHECKING:
+ assert self.scheduler_job_id
+
+ try:
+ self_id = int(self.scheduler_job_id)
+ except (TypeError, ValueError):
+ # Tests sometimes set scheduler_job_id to a non-numeric string.
+ # In production it's always Job.id (int), but be defensive.
+ return set()
+
+ try:
+ from datetime import timedelta
+
+ from sqlalchemy import select
+
+ from airflow.jobs.job import Job
+ from airflow.utils import timezone
+ from airflow.utils.session import create_session
+ from airflow.utils.state import JobState
+
+ timeout = conf.getint("scheduler",
"scheduler_health_check_threshold")
+ cutoff = timezone.utcnow() - timedelta(seconds=timeout)
+ with create_session() as session:
+ # Iterate the scalar cursor straight into the set so we never
+ # materialize an intermediate list — keeps the memory
+ # footprint flat regardless of how many sibling schedulers
+ # are alive
+ return {
+ jid
+ for jid in session.scalars(
+ select(Job.id).where(
+ Job.job_type == "SchedulerJob",
+ Job.state == JobState.RUNNING,
+ Job.latest_heartbeat >= cutoff,
+ Job.id != self_id,
+ )
+ )
+ }
+ except Exception as exc:
+ self.log.warning(
+ "Could not query alive SchedulerJobs for completed-pod
adoption "
+ "scoping: %s. Falling back to exclude-self-only.",
+ exc,
+ )
+ return set()
+
def _adopt_completed_pods(self, kube_client: client.CoreV1Api) -> None:
"""
- Patch completed pods so that the KubernetesJobWatcher can delete them.
+ Patch completed pods owned by no-longer-alive schedulers so this
scheduler's watcher can delete them.
+
+ Originally this method patched every Succeeded pod that did not carry
+ THIS scheduler's ``airflow-worker`` label. With multi-scheduler
+ deployments that caused thrashing — every scheduler relabeled every
+ other scheduler's completed pods on each interval tick, fighting over
+ ownership and burning kube-API and watcher cycles (see #66396).
+
+ The fix scopes the selector to also exclude pods owned by every
+ currently-alive sibling scheduler. With one scheduler, behavior is
+ unchanged (no siblings → original "exclude self only" selector). With
+ multiple schedulers, each one only adopts pods whose owning scheduler
+ is gone — preserving the original goal of #61839 (cleanup after a
+ scheduler restart) without the multi-scheduler regression.
:param kube_client: kubernetes client for speaking to kube API
"""
if TYPE_CHECKING:
assert self.scheduler_job_id
- new_worker_id_label =
self._make_safe_label_value(self.scheduler_job_id)
+ self_label = self._make_safe_label_value(self.scheduler_job_id)
+ excluded_labels = sorted(
+ {
+ self_label,
+ *(self._make_safe_label_value(str(jid)) for jid in
self._alive_other_scheduler_job_ids()),
+ }
+ )
+
+ if len(excluded_labels) == 1:
+ # Equality-based selector — preserves the pre-fix label_selector
+ # exactly when no sibling scheduler is alive, so single-scheduler
+ # deployments see no behavior change.
+ worker_filter = f"airflow-worker!={excluded_labels[0]}"
+ else:
+ # Set-based requirement: K8s parses `notin (a,b,c)` as "label
+ # value is none of these". Mixed with the surrounding
+ # equality-based requirements via comma separator.
+ worker_filter = f"airflow-worker notin
({','.join(excluded_labels)})"
+
query_kwargs = {
"field_selector": "status.phase=Succeeded",
- "label_selector": (
- "kubernetes_executor=True,"
-
f"airflow-worker!={new_worker_id_label},{POD_EXECUTOR_DONE_KEY}!=True"
- ),
+ "label_selector":
(f"kubernetes_executor=True,{worker_filter},{POD_EXECUTOR_DONE_KEY}!=True"),
}
pod_list = self._list_pods(query_kwargs)
for pod in pod_list:
@@ -707,7 +799,7 @@ class KubernetesExecutor(BaseExecutor):
kube_client.patch_namespaced_pod(
name=pod.metadata.name,
namespace=pod.metadata.namespace,
- body={"metadata": {"labels": {"airflow-worker":
new_worker_id_label}}},
+ body={"metadata": {"labels": {"airflow-worker":
self_label}}},
)
except ApiException as e:
self.log.info("Failed to adopt pod %s. Reason: %s",
pod.metadata.name, e)
diff --git
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py
index 33036069d36..9805670ec81 100644
---
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py
+++
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py
@@ -1383,6 +1383,79 @@ class TestKubernetesExecutor:
assert len(pod_names) ==
mock_kube_client.patch_namespaced_pod.call_count
assert executor.running == set()
+ @mock.patch(
+ "airflow.providers.cncf.kubernetes.executors.kubernetes_executor."
+ "KubernetesExecutor._alive_other_scheduler_job_ids"
+ )
+
@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.DynamicClient")
+
@mock.patch("airflow.providers.cncf.kubernetes.kube_client.get_kube_client")
+ def test_adopt_completed_pods_excludes_alive_siblings(
+ self, mock_kube_client, mock_kube_dynamic_client, mock_alive_ids
+ ):
+ """
+ With multiple alive schedulers, the label selector must exclude pods
owned
+ by every alive sibling — not just self — so the schedulers do not
thrash
+ relabeling each other's completed pods (see #66396).
+ """
+ mock_alive_ids.return_value = {7, 9}
+ executor = self.kubernetes_executor
+ executor.scheduler_job_id = "5"
+ executor.kube_client = mock_kube_client
+ mock_kube_dynamic_client.return_value = mock.MagicMock()
+ mock_pod_resource = mock.MagicMock()
+ mock_kube_dynamic_client.return_value.resources.get.return_value =
mock_pod_resource
+ mock_kube_dynamic_client.return_value.get.return_value.items = []
+ executor.kube_config.kube_namespace = "somens"
+
+ executor._adopt_completed_pods(mock_kube_client)
+
+ # Selector must use set-based `notin (...)` listing self + every alive
sibling,
+ # sorted for determinism. Anything outside that set is a pod whose
owning
+ # scheduler is gone and therefore safe for this scheduler to adopt.
+ mock_kube_dynamic_client.return_value.get.assert_called_once_with(
+ resource=mock_pod_resource,
+ namespace="somens",
+ field_selector="status.phase=Succeeded",
+ label_selector=(
+ "kubernetes_executor=True,airflow-worker notin
(5,7,9),airflow_executor_done!=True"
+ ),
+ header_params={"Accept":
"application/json;as=PartialObjectMetadataList;v=v1;g=meta.k8s.io"},
+ )
+
+ @mock.patch(
+ "airflow.providers.cncf.kubernetes.executors.kubernetes_executor."
+ "KubernetesExecutor._alive_other_scheduler_job_ids"
+ )
+
@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.DynamicClient")
+
@mock.patch("airflow.providers.cncf.kubernetes.kube_client.get_kube_client")
+ def test_adopt_completed_pods_single_scheduler_unchanged(
+ self, mock_kube_client, mock_kube_dynamic_client, mock_alive_ids
+ ):
+ """
+ With only one scheduler (no alive siblings), the selector must remain
+ identical to the pre-#66396-fix equality form so single-scheduler
+ deployments see no behavior change.
+ """
+ mock_alive_ids.return_value = set()
+ executor = self.kubernetes_executor
+ executor.scheduler_job_id = "modified"
+ executor.kube_client = mock_kube_client
+ mock_kube_dynamic_client.return_value = mock.MagicMock()
+ mock_pod_resource = mock.MagicMock()
+ mock_kube_dynamic_client.return_value.resources.get.return_value =
mock_pod_resource
+ mock_kube_dynamic_client.return_value.get.return_value.items = []
+ executor.kube_config.kube_namespace = "somens"
+
+ executor._adopt_completed_pods(mock_kube_client)
+
+ mock_kube_dynamic_client.return_value.get.assert_called_once_with(
+ resource=mock_pod_resource,
+ namespace="somens",
+ field_selector="status.phase=Succeeded",
+
label_selector="kubernetes_executor=True,airflow-worker!=modified,airflow_executor_done!=True",
+ header_params={"Accept":
"application/json;as=PartialObjectMetadataList;v=v1;g=meta.k8s.io"},
+ )
+
@mock.patch("airflow.providers.cncf.kubernetes.kube_client.get_kube_client")
def test_not_adopt_unassigned_task(self, mock_kube_client):
"""