This is an automated email from the ASF dual-hosted git repository. jedcunningham pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new b8f15a98bf Use `pod_name` when patching done KE pods (#29159) b8f15a98bf is described below commit b8f15a98bf943884262a498df66b81268fa71a98 Author: Jed Cunningham <66968678+jedcunning...@users.noreply.github.com> AuthorDate: Wed Jan 25 16:04:27 2023 -0600 Use `pod_name` when patching done KE pods (#29159) This isn't in a release yet, so let's take the opportunity to fix the naming of the kwarg. --- airflow/executors/kubernetes_executor.py | 10 +++++----- tests/executors/test_kubernetes_executor.py | 11 +++++++---- 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py index f2802a56da..de48d66ffb 100644 --- a/airflow/executors/kubernetes_executor.py +++ b/airflow/executors/kubernetes_executor.py @@ -370,17 +370,17 @@ class AirflowKubernetesScheduler(LoggingMixin): if e.status != 404: raise - def patch_pod_executor_done(self, *, pod_id: str, namespace: str): + def patch_pod_executor_done(self, *, pod_name: str, namespace: str): """Add a "done" annotation to ensure we don't continually adopt pods""" - self.log.debug("Patching pod %s in namespace %s to mark it as done", pod_id, namespace) + self.log.debug("Patching pod %s in namespace %s to mark it as done", pod_name, namespace) try: self.kube_client.patch_namespaced_pod( - name=pod_id, + name=pod_name, namespace=namespace, body={"metadata": {"labels": {POD_EXECUTOR_DONE_KEY: "True"}}}, ) except ApiException as e: - self.log.info("Failed to patch pod %s with done annotation. Reason: %s", pod_id, e) + self.log.info("Failed to patch pod %s with done annotation. Reason: %s", pod_name, e) def sync(self) -> None: """ @@ -761,7 +761,7 @@ class KubernetesExecutor(BaseExecutor): self.kube_scheduler.delete_pod(pod_id, namespace) self.log.info("Deleted pod: %s in namespace %s", str(key), str(namespace)) else: - self.kube_scheduler.patch_pod_executor_done(pod_id=pod_id, namespace=namespace) + self.kube_scheduler.patch_pod_executor_done(pod_name=pod_id, namespace=namespace) self.log.info("Patched pod %s in namespace %s to mark it as done", str(key), str(namespace)) try: diff --git a/tests/executors/test_kubernetes_executor.py b/tests/executors/test_kubernetes_executor.py index 99d6faf527..f040eb8d07 100644 --- a/tests/executors/test_kubernetes_executor.py +++ b/tests/executors/test_kubernetes_executor.py @@ -545,10 +545,12 @@ class TestKubernetesExecutor: @mock.patch("airflow.executors.kubernetes_executor.KubernetesJobWatcher") @mock.patch("airflow.executors.kubernetes_executor.get_kube_client") - @mock.patch("airflow.executors.kubernetes_executor.AirflowKubernetesScheduler.delete_pod") + @mock.patch("airflow.executors.kubernetes_executor.AirflowKubernetesScheduler") def test_change_state_failed_no_deletion( - self, mock_delete_pod, mock_get_kube_client, mock_kubernetes_job_watcher + self, mock_kubescheduler, mock_get_kube_client, mock_kubernetes_job_watcher ): + mock_delete_pod = mock_kubescheduler.return_value.delete_pod + mock_patch_pod = mock_kubescheduler.return_value.patch_pod_executor_done executor = self.kubernetes_executor executor.kube_config.delete_worker_pods = False executor.kube_config.delete_worker_pods_on_failure = False @@ -556,10 +558,11 @@ class TestKubernetesExecutor: try: key = ("dag_id", "task_id", "run_id", "try_number3") executor.running = {key} - executor._change_state(key, State.FAILED, "pod_id", "default") + executor._change_state(key, State.FAILED, "pod_id", "test-namespace") assert executor.event_buffer[key][0] == State.FAILED assert executor.running == set() mock_delete_pod.assert_not_called() + mock_patch_pod.assert_called_once_with(pod_name="pod_id", namespace="test-namespace") finally: executor.end() @@ -606,7 +609,7 @@ class TestKubernetesExecutor: assert executor.event_buffer[key][0] == State.SUCCESS assert executor.running == set() mock_delete_pod.assert_not_called() - mock_patch_pod.assert_called_once_with(pod_id="pod_id", namespace="test-namespace") + mock_patch_pod.assert_called_once_with(pod_name="pod_id", namespace="test-namespace") finally: executor.end()