This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 84642755ef9 KubernetesExecutor: scope periodic completed-pod adoption 
to dead schedulers (#66400)
84642755ef9 is described below

commit 84642755ef9b3d68e6d7b6a4af4554a21e19992c
Author: Jarek Potiuk <[email protected]>
AuthorDate: Tue May 5 22:18:59 2026 +0200

    KubernetesExecutor: scope periodic completed-pod adoption to dead 
schedulers (#66400)
    
    * KubernetesExecutor: scope periodic completed-pod adoption to dead 
schedulers
    
    PR #61839 (cncf-kubernetes 10.15.0) added a periodic call to
    `_adopt_completed_pods` from inside `KubernetesExecutor.sync()`, gated by
    `[scheduler] orphaned_tasks_check_interval` (default 300 s). The query
    selects every Succeeded pod whose `airflow-worker` label is not the current
    scheduler's label and PATCHes it with the current scheduler's label so its
    KubernetesJobWatcher will see the change and DELETE the pod.
    
    With multi-scheduler deployments that caused thrashing — every
    `orphaned_tasks_check_interval` each scheduler iterated over every Succeeded
    pod that did not carry its own label and PATCHed it. Schedulers fought each
    other:
    
      * Scheduler A relabels every Succeeded pod owned by B and C → A's watcher
        DELETEs them.
      * Scheduler B does the same a few seconds later → relabels A's freshly
        patched pods to B → B's watcher takes over.
      * Scheduler C the same.
    
    At steady state with high pod churn this manifested as heavy
    PATCH /api/v1/namespaces/.../pods/... traffic, expensive `_list_pods` calls
    on every interval tick (#35599 already documents this is 15-30 s with 500
    pods), and tasks stalling in `scheduled` / `queued` because every scheduler
    loop was burning seconds inside `_list_pods` and `patch_namespaced_pod`
    instead of doing useful scheduling. Setting `delete_worker_pods=False` did
    NOT help — the periodic adoption code path doesn't gate on it; it goes
    through the watcher's delete.
    
    Fix: scope the periodic adoption to pods owned by no-longer-alive
    schedulers. New helper `_alive_other_scheduler_job_ids` queries the
    `Job` table for SchedulerJobs whose `state == RUNNING` and whose
    `latest_heartbeat` is within `[scheduler] scheduler_health_check_threshold`
    (matching the alive-scheduler definition already used by
    `SchedulerJobRunner.adopt_or_reset_orphaned_tasks`). The label selector
    in `_adopt_completed_pods` is then built to exclude self + every alive
    sibling using K8s set-based syntax `airflow-worker notin (a,b,c)`:
    
      * Single-scheduler deployment: no behavior change. Helper returns empty
        set, selector falls back to the original equality form
        `airflow-worker!=<self_label>`.
      * Multi-scheduler deployment: each scheduler only adopts pods whose
        owning scheduler is gone — preserving the original goal of #61839
        (cleanup after a scheduler restart) without the thrash.
    
    If the DB query fails, the helper returns an empty set so the caller
    falls back to the pre-#61839 "exclude self only" selector — a transient
    DB issue must not break completed-pod cleanup.
    
    Two new unit tests cover the multi-scheduler set-based selector and
    confirm the single-scheduler equality form is unchanged. Existing
    `test_adopt_completed_pods` and `test_adopt_completed_pods_api_exception`
    keep their original assertions because the new helper falls back to an
    empty set when `scheduler_job_id` is the test's non-numeric string.
    
    Closes: #66396
    
    * Iterate sibling-scheduler query result instead of materializing list
    
    Address @jscheffl's first review point on #66400: previously the
    `_alive_other_scheduler_job_ids` helper called `.all()` to materialize
    the SQLAlchemy scalar result into a `list[int]` before passing it to
    `set(...)`. Switch to a set-comprehension over the scalar cursor so
    no intermediate list is built.
    
    The functional behavior is identical; this just keeps the in-flight
    memory footprint flat regardless of how many sibling schedulers are
    alive at the moment of the query. In practice the query returns
    `int`s of a tens-of-bytes-per-row class for a handful of rows in any
    realistic Airflow HA deployment, so the saving is small — but the
    cleaner pattern matches the review feedback exactly.
    
    * Update 
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
    
    Co-authored-by: Jens Scheffler <[email protected]>
    
    ---------
    
    Co-authored-by: Jens Scheffler <[email protected]>
---
 .../kubernetes/executors/kubernetes_executor.py    | 106 +++++++++++++++++++--
 .../executors/test_kubernetes_executor.py          |  73 ++++++++++++++
 2 files changed, 172 insertions(+), 7 deletions(-)

diff --git 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
index 8320c566e0b..d109ac097dd 100644
--- 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
+++ 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
@@ -681,22 +681,114 @@ class KubernetesExecutor(BaseExecutor):
         del tis_to_flush_by_key[ti_key]
         self.running.add(ti_key)
 
+    def _alive_other_scheduler_job_ids(self) -> set[int]:
+        """
+        Return job IDs of every SchedulerJob that is currently alive — 
excluding self.
+
+        "Alive" means ``Job.state == RUNNING`` AND its ``latest_heartbeat`` is
+        within ``[scheduler] scheduler_health_check_threshold``.
+
+        Used by ``_adopt_completed_pods`` to scope cross-scheduler pod
+        adoption to pods owned by no-longer-alive schedulers (#66396).
+        With a single scheduler the returned set is always empty — the
+        original "exclude self only" behavior is preserved. With multiple
+        schedulers each one only adopts pods whose owning scheduler is gone,
+        eliminating the relabel-thrash that PR #61839 introduced.
+
+        Returns an empty set on any DB error so the caller falls back to
+        the pre-#61839 "exclude self only" selector — a transient DB issue
+        must not break completed-pod cleanup.
+        """
+        if TYPE_CHECKING:
+            assert self.scheduler_job_id
+
+        try:
+            self_id = int(self.scheduler_job_id)
+        except (TypeError, ValueError):
+            # Tests sometimes set scheduler_job_id to a non-numeric string.
+            # In production it's always Job.id (int), but be defensive.
+            return set()
+
+        try:
+            from datetime import timedelta
+
+            from sqlalchemy import select
+
+            from airflow.jobs.job import Job
+            from airflow.utils import timezone
+            from airflow.utils.session import create_session
+            from airflow.utils.state import JobState
+
+            timeout = conf.getint("scheduler", 
"scheduler_health_check_threshold")
+            cutoff = timezone.utcnow() - timedelta(seconds=timeout)
+            with create_session() as session:
+                # Iterate the scalar cursor straight into the set so we never
+                # materialize an intermediate list — keeps the memory
+                # footprint flat regardless of how many sibling schedulers
+                # are alive
+                return {
+                    jid
+                    for jid in session.scalars(
+                        select(Job.id).where(
+                            Job.job_type == "SchedulerJob",
+                            Job.state == JobState.RUNNING,
+                            Job.latest_heartbeat >= cutoff,
+                            Job.id != self_id,
+                        )
+                    )
+                }
+        except Exception as exc:
+            self.log.warning(
+                "Could not query alive SchedulerJobs for completed-pod 
adoption "
+                "scoping: %s. Falling back to exclude-self-only.",
+                exc,
+            )
+            return set()
+
     def _adopt_completed_pods(self, kube_client: client.CoreV1Api) -> None:
         """
-        Patch completed pods so that the KubernetesJobWatcher can delete them.
+        Patch completed pods owned by no-longer-alive schedulers so this 
scheduler's watcher can delete them.
+
+        Originally this method patched every Succeeded pod that did not carry
+        THIS scheduler's ``airflow-worker`` label. With multi-scheduler
+        deployments that caused thrashing — every scheduler relabeled every
+        other scheduler's completed pods on each interval tick, fighting over
+        ownership and burning kube-API and watcher cycles (see #66396).
+
+        The fix scopes the selector to also exclude pods owned by every
+        currently-alive sibling scheduler. With one scheduler, behavior is
+        unchanged (no siblings → original "exclude self only" selector). With
+        multiple schedulers, each one only adopts pods whose owning scheduler
+        is gone — preserving the original goal of #61839 (cleanup after a
+        scheduler restart) without the multi-scheduler regression.
 
         :param kube_client: kubernetes client for speaking to kube API
         """
         if TYPE_CHECKING:
             assert self.scheduler_job_id
 
-        new_worker_id_label = 
self._make_safe_label_value(self.scheduler_job_id)
+        self_label = self._make_safe_label_value(self.scheduler_job_id)
+        excluded_labels = sorted(
+            {
+                self_label,
+                *(self._make_safe_label_value(str(jid)) for jid in 
self._alive_other_scheduler_job_ids()),
+            }
+        )
+
+        if len(excluded_labels) == 1:
+            # Equality-based selector — preserves the pre-fix label_selector
+            # exactly when no sibling scheduler is alive, so single-scheduler
+            # deployments see no behavior change.
+            worker_filter = f"airflow-worker!={excluded_labels[0]}"
+        else:
+            # Set-based requirement: K8s parses `notin (a,b,c)` as "label
+            # value is none of these". Mixed with the surrounding
+            # equality-based requirements via comma separator.
+            worker_filter = f"airflow-worker notin 
({','.join(excluded_labels)})"
+
         query_kwargs = {
             "field_selector": "status.phase=Succeeded",
-            "label_selector": (
-                "kubernetes_executor=True,"
-                
f"airflow-worker!={new_worker_id_label},{POD_EXECUTOR_DONE_KEY}!=True"
-            ),
+            "label_selector": 
(f"kubernetes_executor=True,{worker_filter},{POD_EXECUTOR_DONE_KEY}!=True"),
         }
         pod_list = self._list_pods(query_kwargs)
         for pod in pod_list:
@@ -707,7 +799,7 @@ class KubernetesExecutor(BaseExecutor):
                 kube_client.patch_namespaced_pod(
                     name=pod.metadata.name,
                     namespace=pod.metadata.namespace,
-                    body={"metadata": {"labels": {"airflow-worker": 
new_worker_id_label}}},
+                    body={"metadata": {"labels": {"airflow-worker": 
self_label}}},
                 )
             except ApiException as e:
                 self.log.info("Failed to adopt pod %s. Reason: %s", 
pod.metadata.name, e)
diff --git 
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py
 
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py
index 33036069d36..9805670ec81 100644
--- 
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py
+++ 
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py
@@ -1383,6 +1383,79 @@ class TestKubernetesExecutor:
         assert len(pod_names) == 
mock_kube_client.patch_namespaced_pod.call_count
         assert executor.running == set()
 
+    @mock.patch(
+        "airflow.providers.cncf.kubernetes.executors.kubernetes_executor."
+        "KubernetesExecutor._alive_other_scheduler_job_ids"
+    )
+    
@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.DynamicClient")
+    
@mock.patch("airflow.providers.cncf.kubernetes.kube_client.get_kube_client")
+    def test_adopt_completed_pods_excludes_alive_siblings(
+        self, mock_kube_client, mock_kube_dynamic_client, mock_alive_ids
+    ):
+        """
+        With multiple alive schedulers, the label selector must exclude pods 
owned
+        by every alive sibling — not just self — so the schedulers do not 
thrash
+        relabeling each other's completed pods (see #66396).
+        """
+        mock_alive_ids.return_value = {7, 9}
+        executor = self.kubernetes_executor
+        executor.scheduler_job_id = "5"
+        executor.kube_client = mock_kube_client
+        mock_kube_dynamic_client.return_value = mock.MagicMock()
+        mock_pod_resource = mock.MagicMock()
+        mock_kube_dynamic_client.return_value.resources.get.return_value = 
mock_pod_resource
+        mock_kube_dynamic_client.return_value.get.return_value.items = []
+        executor.kube_config.kube_namespace = "somens"
+
+        executor._adopt_completed_pods(mock_kube_client)
+
+        # Selector must use set-based `notin (...)` listing self + every alive 
sibling,
+        # sorted for determinism. Anything outside that set is a pod whose 
owning
+        # scheduler is gone and therefore safe for this scheduler to adopt.
+        mock_kube_dynamic_client.return_value.get.assert_called_once_with(
+            resource=mock_pod_resource,
+            namespace="somens",
+            field_selector="status.phase=Succeeded",
+            label_selector=(
+                "kubernetes_executor=True,airflow-worker notin 
(5,7,9),airflow_executor_done!=True"
+            ),
+            header_params={"Accept": 
"application/json;as=PartialObjectMetadataList;v=v1;g=meta.k8s.io"},
+        )
+
+    @mock.patch(
+        "airflow.providers.cncf.kubernetes.executors.kubernetes_executor."
+        "KubernetesExecutor._alive_other_scheduler_job_ids"
+    )
+    
@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.DynamicClient")
+    
@mock.patch("airflow.providers.cncf.kubernetes.kube_client.get_kube_client")
+    def test_adopt_completed_pods_single_scheduler_unchanged(
+        self, mock_kube_client, mock_kube_dynamic_client, mock_alive_ids
+    ):
+        """
+        With only one scheduler (no alive siblings), the selector must remain
+        identical to the pre-#66396-fix equality form so single-scheduler
+        deployments see no behavior change.
+        """
+        mock_alive_ids.return_value = set()
+        executor = self.kubernetes_executor
+        executor.scheduler_job_id = "modified"
+        executor.kube_client = mock_kube_client
+        mock_kube_dynamic_client.return_value = mock.MagicMock()
+        mock_pod_resource = mock.MagicMock()
+        mock_kube_dynamic_client.return_value.resources.get.return_value = 
mock_pod_resource
+        mock_kube_dynamic_client.return_value.get.return_value.items = []
+        executor.kube_config.kube_namespace = "somens"
+
+        executor._adopt_completed_pods(mock_kube_client)
+
+        mock_kube_dynamic_client.return_value.get.assert_called_once_with(
+            resource=mock_pod_resource,
+            namespace="somens",
+            field_selector="status.phase=Succeeded",
+            
label_selector="kubernetes_executor=True,airflow-worker!=modified,airflow_executor_done!=True",
+            header_params={"Accept": 
"application/json;as=PartialObjectMetadataList;v=v1;g=meta.k8s.io"},
+        )
+
     
@mock.patch("airflow.providers.cncf.kubernetes.kube_client.get_kube_client")
     def test_not_adopt_unassigned_task(self, mock_kube_client):
         """

Reply via email to