This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 49108e15eb Kubernetes executor running slots leak fix (#36240)
49108e15eb is described below

commit 49108e15eb2eb30e2ccb95c9332db7b38d35f2de
Author: Gopal Dirisala <39794726+dir...@users.noreply.github.com>
AuthorDate: Wed Dec 20 20:42:40 2023 +0530

    Kubernetes executor running slots leak fix (#36240)
    
    
    
    ---------
    
    Co-authored-by: gopal <gopal_diris...@apple.com>
---
 .../kubernetes/executors/kubernetes_executor.py    | 16 ++++++-
 .../executors/kubernetes_executor_types.py         |  7 ++--
 .../executors/kubernetes_executor_utils.py         |  9 +++-
 .../executors/test_kubernetes_executor.py          | 49 ++++++++++++++++++++--
 4 files changed, 71 insertions(+), 10 deletions(-)

diff --git a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py 
b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
index 6e32c00473..a5d911f8ce 100644
--- a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
+++ b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
@@ -75,7 +75,10 @@ except ImportError:
     raise
 from airflow.configuration import conf
 from airflow.executors.base_executor import BaseExecutor
-from airflow.providers.cncf.kubernetes.executors.kubernetes_executor_types 
import POD_EXECUTOR_DONE_KEY
+from airflow.providers.cncf.kubernetes.executors.kubernetes_executor_types 
import (
+    ADOPTED,
+    POD_EXECUTOR_DONE_KEY,
+)
 from airflow.providers.cncf.kubernetes.kube_config import KubeConfig
 from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import 
annotations_to_key
 from airflow.utils.event_scheduler import EventScheduler
@@ -463,7 +466,7 @@ class KubernetesExecutor(BaseExecutor):
     def _change_state(
         self,
         key: TaskInstanceKey,
-        state: TaskInstanceState | None,
+        state: TaskInstanceState | str | None,
         pod_name: str,
         namespace: str,
         session: Session = NEW_SESSION,
@@ -471,6 +474,15 @@ class KubernetesExecutor(BaseExecutor):
         if TYPE_CHECKING:
             assert self.kube_scheduler
 
+        if state == ADOPTED:
+            # When the task pod is adopted by another executor,
+            # then remove the task from the current executor running queue.
+            try:
+                self.running.remove(key)
+            except KeyError:
+                self.log.debug("TI key not in running: %s", key)
+            return
+
         if state == TaskInstanceState.RUNNING:
             self.event_buffer[key] = state, None
             return
diff --git 
a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_types.py 
b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_types.py
index 80b8f1de72..4229136298 100644
--- a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_types.py
+++ b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_types.py
@@ -16,8 +16,9 @@
 # under the License.
 from __future__ import annotations
 
-from typing import TYPE_CHECKING, Any, Dict, Optional, Tuple
+from typing import TYPE_CHECKING, Any, Dict, Optional, Tuple, Union
 
+ADOPTED = "adopted"
 if TYPE_CHECKING:
     from airflow.executors.base_executor import CommandType
     from airflow.models.taskinstance import TaskInstanceKey
@@ -27,10 +28,10 @@ if TYPE_CHECKING:
     KubernetesJobType = Tuple[TaskInstanceKey, CommandType, Any, Optional[str]]
 
     # key, pod state, pod_name, namespace, resource_version
-    KubernetesResultsType = Tuple[TaskInstanceKey, 
Optional[TaskInstanceState], str, str, str]
+    KubernetesResultsType = Tuple[TaskInstanceKey, 
Optional[Union[TaskInstanceState, str]], str, str, str]
 
     # pod_name, namespace, pod state, annotations, resource_version
-    KubernetesWatchType = Tuple[str, str, Optional[TaskInstanceState], 
Dict[str, str], str]
+    KubernetesWatchType = Tuple[str, str, Optional[Union[TaskInstanceState, 
str]], Dict[str, str], str]
 
 ALL_NAMESPACES = "ALL_NAMESPACES"
 POD_EXECUTOR_DONE_KEY = "airflow_executor_done"
diff --git 
a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py 
b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py
index 074b65d198..09b47aea19 100644
--- a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py
+++ b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py
@@ -40,6 +40,7 @@ from airflow.utils.state import TaskInstanceState
 
 try:
     from airflow.providers.cncf.kubernetes.executors.kubernetes_executor_types 
import (
+        ADOPTED,
         ALL_NAMESPACES,
         POD_EXECUTOR_DONE_KEY,
     )
@@ -220,7 +221,13 @@ class KubernetesJobWatcher(multiprocessing.Process, 
LoggingMixin):
         pod = event["object"]
         annotations_string = annotations_for_logging_task_metadata(annotations)
         """Process status response."""
-        if status == "Pending":
+        if event["type"] == "DELETED" and not pod.metadata.deletion_timestamp:
+            # This will happen only when the task pods are adopted by another 
executor.
+            # So, there is no change in the pod state.
+            # However, need to free the executor slot from the current 
executor.
+            self.log.info("Event: pod %s adopted, annotations: %s", pod_name, 
annotations_string)
+            self.watcher_queue.put((pod_name, namespace, ADOPTED, annotations, 
resource_version))
+        elif status == "Pending":
             # deletion_timestamp is set by kube server when a graceful 
deletion is requested.
             # since kube server have received request to delete pod set TI 
state failed
             if event["type"] == "DELETED" and pod.metadata.deletion_timestamp:
diff --git 
a/tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py 
b/tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py
index a0b187087a..15b17515c1 100644
--- a/tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py
+++ b/tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py
@@ -44,7 +44,10 @@ try:
         KubernetesExecutor,
         PodReconciliationError,
     )
-    from airflow.providers.cncf.kubernetes.executors.kubernetes_executor_types 
import POD_EXECUTOR_DONE_KEY
+    from airflow.providers.cncf.kubernetes.executors.kubernetes_executor_types 
import (
+        ADOPTED,
+        POD_EXECUTOR_DONE_KEY,
+    )
     from airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils 
import (
         AirflowKubernetesScheduler,
         KubernetesJobWatcher,
@@ -644,6 +647,25 @@ class TestKubernetesExecutor:
         finally:
             executor.end()
 
+    @pytest.mark.db_test
+    
@mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.KubernetesJobWatcher")
+    
@mock.patch("airflow.providers.cncf.kubernetes.kube_client.get_kube_client")
+    @mock.patch(
+        
"airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.AirflowKubernetesScheduler.delete_pod"
+    )
+    def test_change_state_adopted(self, mock_delete_pod, mock_get_kube_client, 
mock_kubernetes_job_watcher):
+        executor = self.kubernetes_executor
+        executor.start()
+        try:
+            key = ("dag_id", "task_id", "run_id", "try_number2")
+            executor.running = {key}
+            executor._change_state(key, ADOPTED, "pod_name", "default")
+            assert len(executor.event_buffer) == 0
+            assert len(executor.running) == 0
+            mock_delete_pod.assert_not_called()
+        finally:
+            executor.end()
+
     @pytest.mark.db_test
     @pytest.mark.parametrize(
         "multi_namespace_mode_namespace_list, watchers_keys",
@@ -1431,12 +1453,31 @@ class TestKubernetesJobWatcher:
         self._run()
         self.watcher.watcher_queue.put.assert_not_called()
 
-    def test_process_status_succeeded_type_delete(self):
-        self.pod.status.phase = "Succeeded"
+    @pytest.mark.parametrize(
+        "ti_state",
+        [
+            TaskInstanceState.SUCCESS,
+            TaskInstanceState.FAILED,
+            TaskInstanceState.RUNNING,
+            TaskInstanceState.QUEUED,
+            TaskInstanceState.UP_FOR_RETRY,
+        ],
+    )
+    def test_process_status_pod_adopted(self, ti_state):
+        self.pod.status.phase = ti_state
         self.events.append({"type": "DELETED", "object": self.pod})
+        self.pod.metadata.deletion_timestamp = None
 
         self._run()
-        self.watcher.watcher_queue.put.assert_not_called()
+        self.watcher.watcher_queue.put.assert_called_once_with(
+            (
+                self.pod.metadata.name,
+                self.watcher.namespace,
+                ADOPTED,
+                self.core_annotations,
+                self.pod.metadata.resource_version,
+            )
+        )
 
     def test_process_status_running_deleted(self):
         self.pod.status.phase = "Running"

Reply via email to