This is an automated email from the ASF dual-hosted git repository.

ash pushed a commit to branch v2-0-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 59afddd2f27aa0529c602da825e1ee429fe3934b
Author: Jed Cunningham <[email protected]>
AuthorDate: Mon Mar 15 15:16:49 2021 -0600

    Fix KubernetesExecutor issue with deleted pending pods (#14810)
    
    This change treats a pending KubernetesExecutor worker pod deletion
    as a failure. This allows them to follow the configured retry rules
    for the task as one would expect.
    
    (cherry picked from commit a639dd364865da7367f342d5721a5f46a7188a29)
---
 airflow/executors/kubernetes_executor.py    |  6 +-
 tests/executors/test_kubernetes_executor.py | 92 +++++++++++++++++++++++++++++
 2 files changed, 94 insertions(+), 4 deletions(-)

diff --git a/airflow/executors/kubernetes_executor.py 
b/airflow/executors/kubernetes_executor.py
index fd5c6fa..c42531a 100644
--- a/airflow/executors/kubernetes_executor.py
+++ b/airflow/executors/kubernetes_executor.py
@@ -194,10 +194,8 @@ class KubernetesJobWatcher(multiprocessing.Process, 
LoggingMixin):
         """Process status response"""
         if status == 'Pending':
             if event['type'] == 'DELETED':
-                self.log.info('Event: Failed to start pod %s, will 
reschedule', pod_id)
-                self.watcher_queue.put(
-                    (pod_id, namespace, State.UP_FOR_RESCHEDULE, annotations, 
resource_version)
-                )
+                self.log.info('Event: Failed to start pod %s', pod_id)
+                self.watcher_queue.put((pod_id, namespace, State.FAILED, 
annotations, resource_version))
             else:
                 self.log.info('Event: %s Pending', pod_id)
         elif status == 'Failed':
diff --git a/tests/executors/test_kubernetes_executor.py 
b/tests/executors/test_kubernetes_executor.py
index dc7cbbb..68b0006 100644
--- a/tests/executors/test_kubernetes_executor.py
+++ b/tests/executors/test_kubernetes_executor.py
@@ -34,6 +34,7 @@ try:
     from airflow.executors.kubernetes_executor import (
         AirflowKubernetesScheduler,
         KubernetesExecutor,
+        KubernetesJobWatcher,
         create_pod_id,
         get_base_pod_from_template,
     )
@@ -328,3 +329,94 @@ class TestKubernetesExecutor(unittest.TestCase):
         executor.adopt_launched_task(mock_kube_client, pod=pod, 
pod_ids=pod_ids)
         assert not mock_kube_client.patch_namespaced_pod.called
         assert pod_ids == {"foobar": {}}
+
+
+class TestKubernetesJobWatcher(unittest.TestCase):
+    def setUp(self):
+        self.watcher = KubernetesJobWatcher(
+            namespace="airflow",
+            multi_namespace_mode=False,
+            watcher_queue=mock.MagicMock(),
+            resource_version="0",
+            scheduler_job_id="123",
+            kube_config=mock.MagicMock(),
+        )
+        self.kube_client = mock.MagicMock()
+        self.core_annotations = {
+            "dag_id": "dag",
+            "task_id": "task",
+            "execution_date": "dt",
+            "try_number": "1",
+        }
+        self.pod = k8s.V1Pod(
+            metadata=k8s.V1ObjectMeta(
+                name="foo",
+                annotations={"airflow-worker": "bar", **self.core_annotations},
+                namespace="airflow",
+                resource_version="456",
+            ),
+            status=k8s.V1PodStatus(phase="Pending"),
+        )
+        self.events = []
+
+    def _run(self):
+        with mock.patch('airflow.executors.kubernetes_executor.watch') as 
mock_watch:
+            mock_watch.Watch.return_value.stream.return_value = self.events
+            latest_resource_version = self.watcher._run(
+                self.kube_client,
+                self.watcher.resource_version,
+                self.watcher.scheduler_job_id,
+                self.watcher.kube_config,
+            )
+            assert self.pod.metadata.resource_version == 
latest_resource_version
+
+    def assert_watcher_queue_called_once_with_state(self, state):
+        self.watcher.watcher_queue.put.assert_called_once_with(
+            (
+                self.pod.metadata.name,
+                self.watcher.namespace,
+                state,
+                self.core_annotations,
+                self.pod.metadata.resource_version,
+            )
+        )
+
+    def test_process_status_pending(self):
+        self.events.append({"type": 'MODIFIED', "object": self.pod})
+
+        self._run()
+        self.watcher.watcher_queue.put.assert_not_called()
+
+    def test_process_status_pending_deleted(self):
+        self.events.append({"type": 'DELETED', "object": self.pod})
+
+        self._run()
+        self.assert_watcher_queue_called_once_with_state(State.FAILED)
+
+    def test_process_status_failed(self):
+        self.pod.status.phase = "Failed"
+        self.events.append({"type": 'MODIFIED', "object": self.pod})
+
+        self._run()
+        self.assert_watcher_queue_called_once_with_state(State.FAILED)
+
+    def test_process_status_succeeded(self):
+        self.pod.status.phase = "Succeeded"
+        self.events.append({"type": 'MODIFIED', "object": self.pod})
+
+        self._run()
+        self.assert_watcher_queue_called_once_with_state(None)
+
+    def test_process_status_running(self):
+        self.pod.status.phase = "Running"
+        self.events.append({"type": 'MODIFIED', "object": self.pod})
+
+        self._run()
+        self.watcher.watcher_queue.put.assert_not_called()
+
+    def test_process_status_catchall(self):
+        self.pod.status.phase = "Unknown"
+        self.events.append({"type": 'MODIFIED', "object": self.pod})
+
+        self._run()
+        self.watcher.watcher_queue.put.assert_not_called()

Reply via email to