This is an automated email from the ASF dual-hosted git repository. ephraimanierobi pushed a commit to branch v2-3-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 3c997781d576deda679489d2dc7df9f0bbd38816 Author: Ruben Laguna <ruben.lag...@gmail.com> AuthorDate: Wed May 11 08:25:49 2022 +0200 Prevent KubernetesJobWatcher getting stuck on resource too old (#23521) * Prevent KubernetesJobWatcher getting stuck on resource too old If the watch fails because "resource too old" the KubernetesJobWatcher should not retry with the same resource version as that will end up in loop where there is no progress. * Reset ResourceVersion().resource_version to 0 (cherry picked from commit dee05b2ebca6ab66f1b447837e11fe204f98b2df) --- airflow/executors/kubernetes_executor.py | 3 +++ tests/executors/test_kubernetes_executor.py | 34 +++++++++++++++++++++++++++++ 2 files changed, 37 insertions(+) diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py index 9b9de71681..c76cf58f41 100644 --- a/airflow/executors/kubernetes_executor.py +++ b/airflow/executors/kubernetes_executor.py @@ -109,6 +109,8 @@ class KubernetesJobWatcher(multiprocessing.Process, LoggingMixin): time.sleep(1) except Exception: self.log.exception('Unknown error in KubernetesJobWatcher. Failing') + self.resource_version = "0" + ResourceVersion().resource_version = "0" raise else: self.log.warning( @@ -288,6 +290,7 @@ class AirflowKubernetesScheduler(LoggingMixin): self.log.error( 'Error while health checking kube watcher process. Process died for unknown reasons' ) + ResourceVersion().resource_version = "0" self.kube_watcher = self._make_kube_watcher() def run_next(self, next_job: KubernetesJobType) -> None: diff --git a/tests/executors/test_kubernetes_executor.py b/tests/executors/test_kubernetes_executor.py index 1a4160d9ce..954f4f0600 100644 --- a/tests/executors/test_kubernetes_executor.py +++ b/tests/executors/test_kubernetes_executor.py @@ -39,6 +39,7 @@ try: AirflowKubernetesScheduler, KubernetesExecutor, KubernetesJobWatcher, + ResourceVersion, create_pod_id, get_base_pod_from_template, ) @@ -957,3 +958,36 @@ class TestKubernetesJobWatcher(unittest.TestCase): f"Kubernetes failure for {raw_object['reason']} " f"with code {raw_object['code']} and message: {raw_object['message']}" ) + + def test_recover_from_resource_too_old(self): + # too old resource + mock_underscore_run = mock.MagicMock() + + def effect(): + yield '500' + while True: + yield Exception('sentinel') + + mock_underscore_run.side_effect = effect() + + self.watcher._run = mock_underscore_run + + with mock.patch('airflow.executors.kubernetes_executor.get_kube_client'): + try: + # self.watcher._run() is mocked and return "500" as last resource_version + self.watcher.run() + except Exception as e: + assert e.args == ('sentinel',) + + # both resource_version should be 0 after _run raises and exception + assert self.watcher.resource_version == '0' + assert ResourceVersion().resource_version == '0' + + # check that in the next run, _run is invoked with resource_version = 0 + mock_underscore_run.reset_mock() + try: + self.watcher.run() + except Exception as e: + assert e.args == ('sentinel',) + + mock_underscore_run.assert_called_once_with(mock.ANY, '0', mock.ANY, mock.ANY)