This is an automated email from the ASF dual-hosted git repository. ash pushed a commit to branch v2-1-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit e32f22a1f791018fd2573eb66958a11a94414b65 Author: MatthewRBruce <[email protected]> AuthorDate: Wed Jun 16 19:16:27 2021 -0400 Fix unsuccessful KubernetesPod final_state call when `is_delete_operator_pod=True` (#15490) If a Kubernetes Pod ends in a state other than `SUCCESS` and `is_delete_operator_pod` is True, then use the `final_state` from the previous `create_new_pod_for_operator` call since the pod is already deleted and the current state can't be re-read. closes: https://github.com/apache/airflow/issues/15456 (cherry picked from commit 4c9735ff9b0201758564fcd64166abde318ec8a7) --- .../cncf/kubernetes/operators/kubernetes_pod.py | 25 +++++++++++----------- .../cncf/kubernetes/utils/pod_launcher.py | 7 +++--- kubernetes_tests/test_kubernetes_pod_operator.py | 10 ++++----- .../test_kubernetes_pod_operator_backcompat.py | 6 +++--- .../kubernetes/operators/test_kubernetes_pod.py | 11 +++++----- 5 files changed, 29 insertions(+), 30 deletions(-) diff --git a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py index 213614c..d6b2eb2 100644 --- a/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py +++ b/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py @@ -356,15 +356,14 @@ class KubernetesPodOperator(BaseOperator): # pylint: disable=too-many-instance- if len(pod_list.items) == 1: try_numbers_match = self._try_numbers_match(context, pod_list.items[0]) - final_state, result = self.handle_pod_overlap( + final_state, remote_pod, result = self.handle_pod_overlap( labels, try_numbers_match, launcher, pod_list.items[0] ) else: self.log.info("creating pod with labels %s and launcher %s", labels, launcher) - final_state, _, result = self.create_new_pod_for_operator(labels, launcher) + final_state, remote_pod, result = self.create_new_pod_for_operator(labels, launcher) if final_state != State.SUCCESS: - status = self.client.read_namespaced_pod(self.pod.metadata.name, self.namespace) - raise AirflowException(f'Pod {self.pod.metadata.name} returned a failure: {status}') + raise AirflowException(f'Pod {self.pod.metadata.name} returned a failure: {remote_pod}') context['task_instance'].xcom_push(key='pod_name', value=self.pod.metadata.name) context['task_instance'].xcom_push(key='pod_namespace', value=self.namespace) return result @@ -373,7 +372,7 @@ class KubernetesPodOperator(BaseOperator): # pylint: disable=too-many-instance- def handle_pod_overlap( self, labels: dict, try_numbers_match: bool, launcher: Any, pod: k8s.V1Pod - ) -> Tuple[State, Optional[str]]: + ) -> Tuple[State, k8s.V1Pod, Optional[str]]: """ In cases where the Scheduler restarts while a KubernetesPodOperator task is running, @@ -398,12 +397,12 @@ class KubernetesPodOperator(BaseOperator): # pylint: disable=too-many-instance- log_line += " Will attach to this pod and monitor instead of starting new one" self.log.info(log_line) self.pod = pod - final_state, result = self.monitor_launched_pod(launcher, pod) + final_state, remote_pod, result = self.monitor_launched_pod(launcher, pod) else: log_line += f"creating pod with labels {labels} and launcher {launcher}" self.log.info(log_line) - final_state, _, result = self.create_new_pod_for_operator(labels, launcher) - return final_state, result + final_state, remote_pod, result = self.create_new_pod_for_operator(labels, launcher) + return final_state, remote_pod, result @staticmethod def _get_pod_identifying_label_string(labels) -> str: @@ -516,7 +515,7 @@ class KubernetesPodOperator(BaseOperator): # pylint: disable=too-many-instance- self.log.debug("Starting pod:\n%s", yaml.safe_dump(self.pod.to_dict())) try: launcher.start_pod(self.pod, startup_timeout=self.startup_timeout_seconds) - final_state, result = launcher.monitor_pod(pod=self.pod, get_logs=self.get_logs) + final_state, remote_pod, result = launcher.monitor_pod(pod=self.pod, get_logs=self.get_logs) except AirflowException: if self.log_events_on_failure: for event in launcher.read_pod_events(self.pod).items: @@ -526,7 +525,7 @@ class KubernetesPodOperator(BaseOperator): # pylint: disable=too-many-instance- if self.is_delete_operator_pod: self.log.debug("Deleting pod for task %s", self.task_id) launcher.delete_pod(self.pod) - return final_state, self.pod, result + return final_state, remote_pod, result def patch_already_checked(self, pod: k8s.V1Pod): """Add an "already tried annotation to ensure we only retry once""" @@ -543,7 +542,7 @@ class KubernetesPodOperator(BaseOperator): # pylint: disable=too-many-instance- :return: """ try: - (final_state, result) = launcher.monitor_pod(pod, get_logs=self.get_logs) + (final_state, remote_pod, result) = launcher.monitor_pod(pod, get_logs=self.get_logs) finally: if self.is_delete_operator_pod: launcher.delete_pod(pod) @@ -551,9 +550,9 @@ class KubernetesPodOperator(BaseOperator): # pylint: disable=too-many-instance- if self.log_events_on_failure: for event in launcher.read_pod_events(pod).items: self.log.error("Pod Event: %s - %s", event.reason, event.message) - self.patch_already_checked(self.pod) + self.patch_already_checked(pod) raise AirflowException(f'Pod returned a failure: {final_state}') - return final_state, result + return final_state, remote_pod, result def on_kill(self) -> None: if self.pod: diff --git a/airflow/providers/cncf/kubernetes/utils/pod_launcher.py b/airflow/providers/cncf/kubernetes/utils/pod_launcher.py index 741b475..791c77e 100644 --- a/airflow/providers/cncf/kubernetes/utils/pod_launcher.py +++ b/airflow/providers/cncf/kubernetes/utils/pod_launcher.py @@ -129,9 +129,9 @@ class PodLauncher(LoggingMixin): raise AirflowException("Pod took too long to start") time.sleep(1) - def monitor_pod(self, pod: V1Pod, get_logs: bool) -> Tuple[State, Optional[str]]: + def monitor_pod(self, pod: V1Pod, get_logs: bool) -> Tuple[State, V1Pod, Optional[str]]: """ - Monitors a pod and returns the final state + Monitors a pod and returns the final state, pod and xcom result :param pod: pod spec that will be monitored :param get_logs: whether to read the logs locally @@ -167,7 +167,8 @@ class PodLauncher(LoggingMixin): while self.pod_is_running(pod): self.log.info('Pod %s has state %s', pod.metadata.name, State.RUNNING) time.sleep(2) - return self._task_status(self.read_pod(pod)), result + remote_pod = self.read_pod(pod) + return self._task_status(remote_pod), remote_pod, result def parse_log_line(self, line: str) -> Tuple[str, str]: """ diff --git a/kubernetes_tests/test_kubernetes_pod_operator.py b/kubernetes_tests/test_kubernetes_pod_operator.py index 48e77aa..5208e3d 100644 --- a/kubernetes_tests/test_kubernetes_pod_operator.py +++ b/kubernetes_tests/test_kubernetes_pod_operator.py @@ -596,7 +596,7 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase): do_xcom_push=False, ) # THEN - monitor_mock.return_value = (State.SUCCESS, None) + monitor_mock.return_value = (State.SUCCESS, None, None) context = create_context(k) k.execute(context) assert start_mock.call_args[0][0].spec.containers[0].env_from == [ @@ -828,7 +828,7 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase): task_id="task" + self.get_current_task_name(), pod_template_file=path, do_xcom_push=True ) - monitor_mock.return_value = (State.SUCCESS, None) + monitor_mock.return_value = (State.SUCCESS, None, None) context = create_context(k) with self.assertLogs(k.log, level=logging.DEBUG) as cm: k.execute(context) @@ -924,7 +924,7 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase): priority_class_name=priority_class_name, ) - monitor_mock.return_value = (State.SUCCESS, None) + monitor_mock.return_value = (State.SUCCESS, None, None) context = create_context(k) k.execute(context) actual_pod = self.api_client.sanitize_for_serialization(k.pod) @@ -966,7 +966,7 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase): termination_grace_period=0, ) context = create_context(k) - monitor_mock.return_value = (State.SUCCESS, None) + monitor_mock.return_value = (State.SUCCESS, None, None) k.execute(context) name = k.pod.metadata.name pod = client.read_namespaced_pod(name=name, namespace=namespace) @@ -1000,7 +1000,7 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase): with mock.patch( "airflow.providers.cncf.kubernetes.utils.pod_launcher.PodLauncher.monitor_pod" ) as monitor_mock: - monitor_mock.return_value = (State.SUCCESS, None) + monitor_mock.return_value = (State.SUCCESS, None, None) k.execute(context) name = k.pod.metadata.name pod = client.read_namespaced_pod(name=name, namespace=namespace) diff --git a/kubernetes_tests/test_kubernetes_pod_operator_backcompat.py b/kubernetes_tests/test_kubernetes_pod_operator_backcompat.py index 5facd47..e2fc6bc 100644 --- a/kubernetes_tests/test_kubernetes_pod_operator_backcompat.py +++ b/kubernetes_tests/test_kubernetes_pod_operator_backcompat.py @@ -136,7 +136,7 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase): image_pull_secrets=fake_pull_secrets, cluster_context='default', ) - monitor_mock.return_value = (State.SUCCESS, None) + monitor_mock.return_value = (State.SUCCESS, None, None) context = create_context(k) k.execute(context=context) assert start_mock.call_args[0][0].spec.image_pull_secrets == [ @@ -468,7 +468,7 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase): configmaps=[configmap], ) # THEN - mock_monitor.return_value = (State.SUCCESS, None) + mock_monitor.return_value = (State.SUCCESS, None, None) context = create_context(k) k.execute(context) assert mock_start.call_args[0][0].spec.containers[0].env_from == [ @@ -496,7 +496,7 @@ class TestKubernetesPodOperatorSystem(unittest.TestCase): do_xcom_push=False, ) # THEN - monitor_mock.return_value = (State.SUCCESS, None) + monitor_mock.return_value = (State.SUCCESS, None, None) context = create_context(k) k.execute(context) assert start_mock.call_args[0][0].spec.containers[0].env_from == [ diff --git a/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py b/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py index a087fc9..cbd0080 100644 --- a/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py +++ b/tests/providers/cncf/kubernetes/operators/test_kubernetes_pod.py @@ -60,7 +60,7 @@ class TestKubernetesPodOperator(unittest.TestCase): } def run_pod(self, operator) -> k8s.V1Pod: - self.monitor_mock.return_value = (State.SUCCESS, None) + self.monitor_mock.return_value = (State.SUCCESS, None, None) context = self.create_context(operator) operator.execute(context=context) return self.start_mock.call_args[0][0] @@ -83,7 +83,7 @@ class TestKubernetesPodOperator(unittest.TestCase): config_file=file_path, cluster_context="default", ) - self.monitor_mock.return_value = (State.SUCCESS, None) + self.monitor_mock.return_value = (State.SUCCESS, None, None) self.client_mock.list_namespaced_pod.return_value = [] context = self.create_context(k) k.execute(context=context) @@ -411,8 +411,8 @@ class TestKubernetesPodOperator(unittest.TestCase): do_xcom_push=False, cluster_context="default", ) - self.monitor_mock.return_value = (State.FAILED, None) failed_pod_status = "read_pod_namespaced_result" + self.monitor_mock.return_value = (State.FAILED, failed_pod_status, None) read_namespaced_pod_mock = self.client_mock.return_value.read_namespaced_pod read_namespaced_pod_mock.return_value = failed_pod_status @@ -424,8 +424,7 @@ class TestKubernetesPodOperator(unittest.TestCase): str(ctx.value) == f"Pod Launching failed: Pod {k.pod.metadata.name} returned a failure: {failed_pod_status}" ) - assert self.client_mock.return_value.read_namespaced_pod.called - assert read_namespaced_pod_mock.call_args[0][0] == k.pod.metadata.name + assert not self.client_mock.return_value.read_namespaced_pod.called def test_no_need_to_describe_pod_on_success(self): name_base = "test" @@ -442,7 +441,7 @@ class TestKubernetesPodOperator(unittest.TestCase): do_xcom_push=False, cluster_context="default", ) - self.monitor_mock.return_value = (State.SUCCESS, None) + self.monitor_mock.return_value = (State.SUCCESS, None, None) context = self.create_context(k) k.execute(context=context)
