This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new b8f15a98bf Use `pod_name` when patching done KE pods (#29159)
b8f15a98bf is described below

commit b8f15a98bf943884262a498df66b81268fa71a98
Author: Jed Cunningham <66968678+jedcunning...@users.noreply.github.com>
AuthorDate: Wed Jan 25 16:04:27 2023 -0600

    Use `pod_name` when patching done KE pods (#29159)
    
    This isn't in a release yet, so let's take the opportunity to fix the
    naming of the kwarg.
---
 airflow/executors/kubernetes_executor.py    | 10 +++++-----
 tests/executors/test_kubernetes_executor.py | 11 +++++++----
 2 files changed, 12 insertions(+), 9 deletions(-)

diff --git a/airflow/executors/kubernetes_executor.py 
b/airflow/executors/kubernetes_executor.py
index f2802a56da..de48d66ffb 100644
--- a/airflow/executors/kubernetes_executor.py
+++ b/airflow/executors/kubernetes_executor.py
@@ -370,17 +370,17 @@ class AirflowKubernetesScheduler(LoggingMixin):
             if e.status != 404:
                 raise
 
-    def patch_pod_executor_done(self, *, pod_id: str, namespace: str):
+    def patch_pod_executor_done(self, *, pod_name: str, namespace: str):
         """Add a "done" annotation to ensure we don't continually adopt pods"""
-        self.log.debug("Patching pod %s in namespace %s to mark it as done", 
pod_id, namespace)
+        self.log.debug("Patching pod %s in namespace %s to mark it as done", 
pod_name, namespace)
         try:
             self.kube_client.patch_namespaced_pod(
-                name=pod_id,
+                name=pod_name,
                 namespace=namespace,
                 body={"metadata": {"labels": {POD_EXECUTOR_DONE_KEY: "True"}}},
             )
         except ApiException as e:
-            self.log.info("Failed to patch pod %s with done annotation. 
Reason: %s", pod_id, e)
+            self.log.info("Failed to patch pod %s with done annotation. 
Reason: %s", pod_name, e)
 
     def sync(self) -> None:
         """
@@ -761,7 +761,7 @@ class KubernetesExecutor(BaseExecutor):
                 self.kube_scheduler.delete_pod(pod_id, namespace)
                 self.log.info("Deleted pod: %s in namespace %s", str(key), 
str(namespace))
         else:
-            self.kube_scheduler.patch_pod_executor_done(pod_id=pod_id, 
namespace=namespace)
+            self.kube_scheduler.patch_pod_executor_done(pod_name=pod_id, 
namespace=namespace)
             self.log.info("Patched pod %s in namespace %s to mark it as done", 
str(key), str(namespace))
 
         try:
diff --git a/tests/executors/test_kubernetes_executor.py 
b/tests/executors/test_kubernetes_executor.py
index 99d6faf527..f040eb8d07 100644
--- a/tests/executors/test_kubernetes_executor.py
+++ b/tests/executors/test_kubernetes_executor.py
@@ -545,10 +545,12 @@ class TestKubernetesExecutor:
 
     @mock.patch("airflow.executors.kubernetes_executor.KubernetesJobWatcher")
     @mock.patch("airflow.executors.kubernetes_executor.get_kube_client")
-    
@mock.patch("airflow.executors.kubernetes_executor.AirflowKubernetesScheduler.delete_pod")
+    
@mock.patch("airflow.executors.kubernetes_executor.AirflowKubernetesScheduler")
     def test_change_state_failed_no_deletion(
-        self, mock_delete_pod, mock_get_kube_client, 
mock_kubernetes_job_watcher
+        self, mock_kubescheduler, mock_get_kube_client, 
mock_kubernetes_job_watcher
     ):
+        mock_delete_pod = mock_kubescheduler.return_value.delete_pod
+        mock_patch_pod = 
mock_kubescheduler.return_value.patch_pod_executor_done
         executor = self.kubernetes_executor
         executor.kube_config.delete_worker_pods = False
         executor.kube_config.delete_worker_pods_on_failure = False
@@ -556,10 +558,11 @@ class TestKubernetesExecutor:
         try:
             key = ("dag_id", "task_id", "run_id", "try_number3")
             executor.running = {key}
-            executor._change_state(key, State.FAILED, "pod_id", "default")
+            executor._change_state(key, State.FAILED, "pod_id", 
"test-namespace")
             assert executor.event_buffer[key][0] == State.FAILED
             assert executor.running == set()
             mock_delete_pod.assert_not_called()
+            mock_patch_pod.assert_called_once_with(pod_name="pod_id", 
namespace="test-namespace")
         finally:
             executor.end()
 
@@ -606,7 +609,7 @@ class TestKubernetesExecutor:
             assert executor.event_buffer[key][0] == State.SUCCESS
             assert executor.running == set()
             mock_delete_pod.assert_not_called()
-            mock_patch_pod.assert_called_once_with(pod_id="pod_id", 
namespace="test-namespace")
+            mock_patch_pod.assert_called_once_with(pod_name="pod_id", 
namespace="test-namespace")
         finally:
             executor.end()
 

Reply via email to