droppoint commented on code in PR #35800:
URL: https://github.com/apache/airflow/pull/35800#discussion_r1404083513


##########
airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py:
##########
@@ -642,39 +641,6 @@ def adopt_launched_task(
         del tis_to_flush_by_key[ti_key]
         self.running.add(ti_key)
 
-    def _adopt_completed_pods(self, kube_client: client.CoreV1Api) -> None:
-        """
-        Patch completed pods so that the KubernetesJobWatcher can delete them.
-
-        :param kube_client: kubernetes client for speaking to kube API
-        """
-        if TYPE_CHECKING:
-            assert self.scheduler_job_id
-
-        new_worker_id_label = 
self._make_safe_label_value(self.scheduler_job_id)
-        query_kwargs = {
-            "field_selector": "status.phase=Succeeded",
-            "label_selector": (
-                "kubernetes_executor=True,"
-                
f"airflow-worker!={new_worker_id_label},{POD_EXECUTOR_DONE_KEY}!=True"
-            ),
-        }
-        pod_list = self._list_pods(query_kwargs)
-        for pod in pod_list:
-            self.log.info("Attempting to adopt pod %s", pod.metadata.name)
-            from kubernetes.client.rest import ApiException
-
-            try:
-                kube_client.patch_namespaced_pod(
-                    name=pod.metadata.name,
-                    namespace=pod.metadata.namespace,
-                    body={"metadata": {"labels": {"airflow-worker": 
new_worker_id_label}}},
-                )
-            except ApiException as e:
-                self.log.info("Failed to adopt pod %s. Reason: %s", 
pod.metadata.name, e)

Review Comment:
   It's not that simple. While it's true that placing "continue" in the except 
block will fix part of the problem, it won't resolve the race condition. Slots 
will continue to leak after that. Please read this 
[comment](https://github.com/apache/airflow/issues/32928#issuecomment-1820413530)
 from issue #32928. In short, the race happens because this function attempts 
to adopt completed pods every 5 minutes (by default) from other fully 
operational schedulers. When this 'adoption' occurs, the TaskInstance key of 
normally completed pod is added to KubernetesExecutor.running set. Since both 
the pod and TaskInstance are complete, no change_state will occur, leaving the 
TaskInstance key in the KubernetesExecutor.running set indefinitely (or until 
the scheduler is restarted). Once the count of KubernetesExecutor.running 
exceeds KubernetesExecutor.parallelism, the executor will cease scheduling new 
tasks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to