This is an automated email from the ASF dual-hosted git repository.

ash pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit e32f22a1f791018fd2573eb66958a11a94414b65
Author: MatthewRBruce <[email protected]>
AuthorDate: Wed Jun 16 19:16:27 2021 -0400

    Fix unsuccessful KubernetesPod final_state call when 
`is_delete_operator_pod=True` (#15490)
    
    If a Kubernetes Pod ends in a state other than `SUCCESS` and 
`is_delete_operator_pod` is True, then use the `final_state` from the previous 
`create_new_pod_for_operator` call since the pod is already deleted and the 
current state can't be re-read.
    
    closes: https://github.com/apache/airflow/issues/15456
    (cherry picked from commit 4c9735ff9b0201758564fcd64166abde318ec8a7)
---
 .../cncf/kubernetes/operators/kubernetes_pod.py    | 25 +++++++++++-----------
 .../cncf/kubernetes/utils/pod_launcher.py          |  7 +++---
 kubernetes_tests/test_kubernetes_pod_operator.py   | 10 ++++-----
 .../test_kubernetes_pod_operator_backcompat.py     |  6 +++---
 .../kubernetes/operators/test_kubernetes_pod.py    | 11 +++++-----
 5 files changed, 29 insertions(+), 30 deletions(-)

diff --git a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py 
b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
index 213614c..d6b2eb2 100644
--- a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
+++ b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py
@@ -356,15 +356,14 @@ class KubernetesPodOperator(BaseOperator):  # pylint: 
disable=too-many-instance-
 
             if len(pod_list.items) == 1:
                 try_numbers_match = self._try_numbers_match(context, 
pod_list.items[0])
-                final_state, result = self.handle_pod_overlap(
+                final_state, remote_pod, result = self.handle_pod_overlap(
                     labels, try_numbers_match, launcher, pod_list.items[0]
                 )
             else:
                 self.log.info("creating pod with labels %s and launcher %s", 
labels, launcher)
-                final_state, _, result = 
self.create_new_pod_for_operator(labels, launcher)
+                final_state, remote_pod, result = 
self.create_new_pod_for_operator(labels, launcher)
             if final_state != State.SUCCESS:
-                status = 
self.client.read_namespaced_pod(self.pod.metadata.name, self.namespace)
-                raise AirflowException(f'Pod {self.pod.metadata.name} returned 
a failure: {status}')
+                raise AirflowException(f'Pod {self.pod.metadata.name} returned 
a failure: {remote_pod}')
             context['task_instance'].xcom_push(key='pod_name', 
value=self.pod.metadata.name)
             context['task_instance'].xcom_push(key='pod_namespace', 
value=self.namespace)
             return result
@@ -373,7 +372,7 @@ class KubernetesPodOperator(BaseOperator):  # pylint: 
disable=too-many-instance-
 
     def handle_pod_overlap(
         self, labels: dict, try_numbers_match: bool, launcher: Any, pod: 
k8s.V1Pod
-    ) -> Tuple[State, Optional[str]]:
+    ) -> Tuple[State, k8s.V1Pod, Optional[str]]:
         """
 
         In cases where the Scheduler restarts while a KubernetesPodOperator 
task is running,
@@ -398,12 +397,12 @@ class KubernetesPodOperator(BaseOperator):  # pylint: 
disable=too-many-instance-
             log_line += " Will attach to this pod and monitor instead of 
starting new one"
             self.log.info(log_line)
             self.pod = pod
-            final_state, result = self.monitor_launched_pod(launcher, pod)
+            final_state, remote_pod, result = 
self.monitor_launched_pod(launcher, pod)
         else:
             log_line += f"creating pod with labels {labels} and launcher 
{launcher}"
             self.log.info(log_line)
-            final_state, _, result = self.create_new_pod_for_operator(labels, 
launcher)
-        return final_state, result
+            final_state, remote_pod, result = 
self.create_new_pod_for_operator(labels, launcher)
+        return final_state, remote_pod, result
 
     @staticmethod
     def _get_pod_identifying_label_string(labels) -> str:
@@ -516,7 +515,7 @@ class KubernetesPodOperator(BaseOperator):  # pylint: 
disable=too-many-instance-
         self.log.debug("Starting pod:\n%s", yaml.safe_dump(self.pod.to_dict()))
         try:
             launcher.start_pod(self.pod, 
startup_timeout=self.startup_timeout_seconds)
-            final_state, result = launcher.monitor_pod(pod=self.pod, 
get_logs=self.get_logs)
+            final_state, remote_pod, result = 
launcher.monitor_pod(pod=self.pod, get_logs=self.get_logs)
         except AirflowException:
             if self.log_events_on_failure:
                 for event in launcher.read_pod_events(self.pod).items:
@@ -526,7 +525,7 @@ class KubernetesPodOperator(BaseOperator):  # pylint: 
disable=too-many-instance-
             if self.is_delete_operator_pod:
                 self.log.debug("Deleting pod for task %s", self.task_id)
                 launcher.delete_pod(self.pod)
-        return final_state, self.pod, result
+        return final_state, remote_pod, result
 
     def patch_already_checked(self, pod: k8s.V1Pod):
         """Add an "already tried annotation to ensure we only retry once"""
@@ -543,7 +542,7 @@ class KubernetesPodOperator(BaseOperator):  # pylint: 
disable=too-many-instance-
         :return:
         """
         try:
-            (final_state, result) = launcher.monitor_pod(pod, 
get_logs=self.get_logs)
+            (final_state, remote_pod, result) = launcher.monitor_pod(pod, 
get_logs=self.get_logs)
         finally:
             if self.is_delete_operator_pod:
                 launcher.delete_pod(pod)
@@ -551,9 +550,9 @@ class KubernetesPodOperator(BaseOperator):  # pylint: 
disable=too-many-instance-
             if self.log_events_on_failure:
                 for event in launcher.read_pod_events(pod).items:
                     self.log.error("Pod Event: %s - %s", event.reason, 
event.message)
-            self.patch_already_checked(self.pod)
+            self.patch_already_checked(pod)
             raise AirflowException(f'Pod returned a failure: {final_state}')
-        return final_state, result
+        return final_state, remote_pod, result
 
     def on_kill(self) -> None:
         if self.pod:
diff --git a/airflow/providers/cncf/kubernetes/utils/pod_launcher.py 
b/airflow/providers/cncf/kubernetes/utils/pod_launcher.py
index 741b475..791c77e 100644
--- a/airflow/providers/cncf/kubernetes/utils/pod_launcher.py
+++ b/airflow/providers/cncf/kubernetes/utils/pod_launcher.py
@@ -129,9 +129,9 @@ class PodLauncher(LoggingMixin):
                     raise AirflowException("Pod took too long to start")
                 time.sleep(1)
 
-    def monitor_pod(self, pod: V1Pod, get_logs: bool) -> Tuple[State, 
Optional[str]]:
+    def monitor_pod(self, pod: V1Pod, get_logs: bool) -> Tuple[State, V1Pod, 
Optional[str]]:
         """
-        Monitors a pod and returns the final state
+        Monitors a pod and returns the final state, pod and xcom result
 
         :param pod: pod spec that will be monitored
         :param get_logs: whether to read the logs locally
@@ -167,7 +167,8 @@ class PodLauncher(LoggingMixin):
         while self.pod_is_running(pod):
             self.log.info('Pod %s has state %s', pod.metadata.name, 
State.RUNNING)
             time.sleep(2)
-        return self._task_status(self.read_pod(pod)), result
+        remote_pod = self.read_pod(pod)
+        return self._task_status(remote_pod), remote_pod, result
 
     def parse_log_line(self, line: str) -> Tuple[str, str]:
         """
diff --git a/kubernetes_tests/test_kubernetes_pod_operator.py 
b/kubernetes_tests/test_kubernetes_pod_operator.py
index 48e77aa..5208e3d 100644
--- a/kubernetes_tests/test_kubernetes_pod_operator.py
+++ b/kubernetes_tests/test_kubernetes_pod_operator.py
@@ -596,7 +596,7 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
             do_xcom_push=False,
         )
         # THEN
-        monitor_mock.return_value = (State.SUCCESS, None)
+        monitor_mock.return_value = (State.SUCCESS, None, None)
         context = create_context(k)
         k.execute(context)
         assert start_mock.call_args[0][0].spec.containers[0].env_from == [
@@ -828,7 +828,7 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
             task_id="task" + self.get_current_task_name(), 
pod_template_file=path, do_xcom_push=True
         )
 
-        monitor_mock.return_value = (State.SUCCESS, None)
+        monitor_mock.return_value = (State.SUCCESS, None, None)
         context = create_context(k)
         with self.assertLogs(k.log, level=logging.DEBUG) as cm:
             k.execute(context)
@@ -924,7 +924,7 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
             priority_class_name=priority_class_name,
         )
 
-        monitor_mock.return_value = (State.SUCCESS, None)
+        monitor_mock.return_value = (State.SUCCESS, None, None)
         context = create_context(k)
         k.execute(context)
         actual_pod = self.api_client.sanitize_for_serialization(k.pod)
@@ -966,7 +966,7 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
             termination_grace_period=0,
         )
         context = create_context(k)
-        monitor_mock.return_value = (State.SUCCESS, None)
+        monitor_mock.return_value = (State.SUCCESS, None, None)
         k.execute(context)
         name = k.pod.metadata.name
         pod = client.read_namespaced_pod(name=name, namespace=namespace)
@@ -1000,7 +1000,7 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
         with mock.patch(
             
"airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.monitor_pod"
         ) as monitor_mock:
-            monitor_mock.return_value = (State.SUCCESS, None)
+            monitor_mock.return_value = (State.SUCCESS, None, None)
             k.execute(context)
             name = k.pod.metadata.name
             pod = client.read_namespaced_pod(name=name, namespace=namespace)
diff --git a/kubernetes_tests/test_kubernetes_pod_operator_backcompat.py 
b/kubernetes_tests/test_kubernetes_pod_operator_backcompat.py
index 5facd47..e2fc6bc 100644
--- a/kubernetes_tests/test_kubernetes_pod_operator_backcompat.py
+++ b/kubernetes_tests/test_kubernetes_pod_operator_backcompat.py
@@ -136,7 +136,7 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
             image_pull_secrets=fake_pull_secrets,
             cluster_context='default',
         )
-        monitor_mock.return_value = (State.SUCCESS, None)
+        monitor_mock.return_value = (State.SUCCESS, None, None)
         context = create_context(k)
         k.execute(context=context)
         assert start_mock.call_args[0][0].spec.image_pull_secrets == [
@@ -468,7 +468,7 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
             configmaps=[configmap],
         )
         # THEN
-        mock_monitor.return_value = (State.SUCCESS, None)
+        mock_monitor.return_value = (State.SUCCESS, None, None)
         context = create_context(k)
         k.execute(context)
         assert mock_start.call_args[0][0].spec.containers[0].env_from == [
@@ -496,7 +496,7 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase):
             do_xcom_push=False,
         )
         # THEN
-        monitor_mock.return_value = (State.SUCCESS, None)
+        monitor_mock.return_value = (State.SUCCESS, None, None)
         context = create_context(k)
         k.execute(context)
         assert start_mock.call_args[0][0].spec.containers[0].env_from == [
diff --git a/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py 
b/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py
index a087fc9..cbd0080 100644
--- a/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py
+++ b/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py
@@ -60,7 +60,7 @@ class TestKubernetesPodOperator(unittest.TestCase):
         }
 
     def run_pod(self, operator) -> k8s.V1Pod:
-        self.monitor_mock.return_value = (State.SUCCESS, None)
+        self.monitor_mock.return_value = (State.SUCCESS, None, None)
         context = self.create_context(operator)
         operator.execute(context=context)
         return self.start_mock.call_args[0][0]
@@ -83,7 +83,7 @@ class TestKubernetesPodOperator(unittest.TestCase):
             config_file=file_path,
             cluster_context="default",
         )
-        self.monitor_mock.return_value = (State.SUCCESS, None)
+        self.monitor_mock.return_value = (State.SUCCESS, None, None)
         self.client_mock.list_namespaced_pod.return_value = []
         context = self.create_context(k)
         k.execute(context=context)
@@ -411,8 +411,8 @@ class TestKubernetesPodOperator(unittest.TestCase):
             do_xcom_push=False,
             cluster_context="default",
         )
-        self.monitor_mock.return_value = (State.FAILED, None)
         failed_pod_status = "read_pod_namespaced_result"
+        self.monitor_mock.return_value = (State.FAILED, failed_pod_status, 
None)
         read_namespaced_pod_mock = 
self.client_mock.return_value.read_namespaced_pod
         read_namespaced_pod_mock.return_value = failed_pod_status
 
@@ -424,8 +424,7 @@ class TestKubernetesPodOperator(unittest.TestCase):
             str(ctx.value)
             == f"Pod Launching failed: Pod {k.pod.metadata.name} returned a 
failure: {failed_pod_status}"
         )
-        assert self.client_mock.return_value.read_namespaced_pod.called
-        assert read_namespaced_pod_mock.call_args[0][0] == k.pod.metadata.name
+        assert not self.client_mock.return_value.read_namespaced_pod.called
 
     def test_no_need_to_describe_pod_on_success(self):
         name_base = "test"
@@ -442,7 +441,7 @@ class TestKubernetesPodOperator(unittest.TestCase):
             do_xcom_push=False,
             cluster_context="default",
         )
-        self.monitor_mock.return_value = (State.SUCCESS, None)
+        self.monitor_mock.return_value = (State.SUCCESS, None, None)
 
         context = self.create_context(k)
         k.execute(context=context)

Reply via email to