droppoint commented on code in PR #35800: URL: https://github.com/apache/airflow/pull/35800#discussion_r1404083513
########## airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py: ########## @@ -642,39 +641,6 @@ def adopt_launched_task( del tis_to_flush_by_key[ti_key] self.running.add(ti_key) - def _adopt_completed_pods(self, kube_client: client.CoreV1Api) -> None: - """ - Patch completed pods so that the KubernetesJobWatcher can delete them. - - :param kube_client: kubernetes client for speaking to kube API - """ - if TYPE_CHECKING: - assert self.scheduler_job_id - - new_worker_id_label = self._make_safe_label_value(self.scheduler_job_id) - query_kwargs = { - "field_selector": "status.phase=Succeeded", - "label_selector": ( - "kubernetes_executor=True," - f"airflow-worker!={new_worker_id_label},{POD_EXECUTOR_DONE_KEY}!=True" - ), - } - pod_list = self._list_pods(query_kwargs) - for pod in pod_list: - self.log.info("Attempting to adopt pod %s", pod.metadata.name) - from kubernetes.client.rest import ApiException - - try: - kube_client.patch_namespaced_pod( - name=pod.metadata.name, - namespace=pod.metadata.namespace, - body={"metadata": {"labels": {"airflow-worker": new_worker_id_label}}}, - ) - except ApiException as e: - self.log.info("Failed to adopt pod %s. Reason: %s", pod.metadata.name, e) Review Comment: It's not that simple. While it's true that placing "continue" in the except block will fix part of the problem, it won't resolve the race condition. Slots will continue to leak after that. Please read this [comment](https://github.com/apache/airflow/issues/32928#issuecomment-1820413530) from issue #32928. In short, the race happens because this function attempts to adopt completed pods every 5 minutes (by default) from other fully operational schedulers. When this 'adoption' occurs, the TaskInstance key of normally completed pod is added to KubernetesExecutor.running set. Since both the pod and TaskInstance are complete, no change_state will occur, leaving the TaskInstance key in the KubernetesExecutor.running set indefinitely (or until the scheduler is restarted). Once the count of KubernetesExecutor.running exceeds KubernetesExecutor.parallelism, the executor will cease scheduling new tasks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org