This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-3-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 3c997781d576deda679489d2dc7df9f0bbd38816
Author: Ruben Laguna <ruben.lag...@gmail.com>
AuthorDate: Wed May 11 08:25:49 2022 +0200

    Prevent KubernetesJobWatcher getting stuck on resource too old (#23521)
    
    * Prevent KubernetesJobWatcher getting stuck on resource too old
    
    If the watch fails because "resource too old" the
    KubernetesJobWatcher should not retry with the same resource version
    as that will end up in loop where there is no progress.
    
    * Reset ResourceVersion().resource_version to 0
    
    (cherry picked from commit dee05b2ebca6ab66f1b447837e11fe204f98b2df)
---
 airflow/executors/kubernetes_executor.py    |  3 +++
 tests/executors/test_kubernetes_executor.py | 34 +++++++++++++++++++++++++++++
 2 files changed, 37 insertions(+)

diff --git a/airflow/executors/kubernetes_executor.py 
b/airflow/executors/kubernetes_executor.py
index 9b9de71681..c76cf58f41 100644
--- a/airflow/executors/kubernetes_executor.py
+++ b/airflow/executors/kubernetes_executor.py
@@ -109,6 +109,8 @@ class KubernetesJobWatcher(multiprocessing.Process, 
LoggingMixin):
                 time.sleep(1)
             except Exception:
                 self.log.exception('Unknown error in KubernetesJobWatcher. 
Failing')
+                self.resource_version = "0"
+                ResourceVersion().resource_version = "0"
                 raise
             else:
                 self.log.warning(
@@ -288,6 +290,7 @@ class AirflowKubernetesScheduler(LoggingMixin):
             self.log.error(
                 'Error while health checking kube watcher process. Process 
died for unknown reasons'
             )
+            ResourceVersion().resource_version = "0"
             self.kube_watcher = self._make_kube_watcher()
 
     def run_next(self, next_job: KubernetesJobType) -> None:
diff --git a/tests/executors/test_kubernetes_executor.py 
b/tests/executors/test_kubernetes_executor.py
index 1a4160d9ce..954f4f0600 100644
--- a/tests/executors/test_kubernetes_executor.py
+++ b/tests/executors/test_kubernetes_executor.py
@@ -39,6 +39,7 @@ try:
         AirflowKubernetesScheduler,
         KubernetesExecutor,
         KubernetesJobWatcher,
+        ResourceVersion,
         create_pod_id,
         get_base_pod_from_template,
     )
@@ -957,3 +958,36 @@ class TestKubernetesJobWatcher(unittest.TestCase):
             f"Kubernetes failure for {raw_object['reason']} "
             f"with code {raw_object['code']} and message: 
{raw_object['message']}"
         )
+
+    def test_recover_from_resource_too_old(self):
+        # too old resource
+        mock_underscore_run = mock.MagicMock()
+
+        def effect():
+            yield '500'
+            while True:
+                yield Exception('sentinel')
+
+        mock_underscore_run.side_effect = effect()
+
+        self.watcher._run = mock_underscore_run
+
+        with 
mock.patch('airflow.executors.kubernetes_executor.get_kube_client'):
+            try:
+                # self.watcher._run() is mocked and return "500" as last 
resource_version
+                self.watcher.run()
+            except Exception as e:
+                assert e.args == ('sentinel',)
+
+            # both  resource_version should be 0 after _run raises and 
exception
+            assert self.watcher.resource_version == '0'
+            assert ResourceVersion().resource_version == '0'
+
+            # check that in the next run, _run is invoked with 
resource_version = 0
+            mock_underscore_run.reset_mock()
+            try:
+                self.watcher.run()
+            except Exception as e:
+                assert e.args == ('sentinel',)
+
+            mock_underscore_run.assert_called_once_with(mock.ANY, '0', 
mock.ANY, mock.ANY)

Reply via email to