This is an automated email from the ASF dual-hosted git repository. ash pushed a commit to branch v2-0-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 6dd75596eb619dfa5e398153fe83b52368af1748 Author: Kaxil Naik <[email protected]> AuthorDate: Sat Feb 20 00:46:39 2021 +0000 Scheduler should not fail when invalid executor_config is passed (#14323) closes #14182 (cherry picked from commit e0ee91e15f8385e34e3d7dfc8a6365e350ea7083) --- airflow/executors/kubernetes_executor.py | 8 +++++++- tests/executors/test_kubernetes_executor.py | 20 ++++++++++++++++++++ 2 files changed, 27 insertions(+), 1 deletion(-) diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py index 88e26be..fd5c6fa 100644 --- a/airflow/executors/kubernetes_executor.py +++ b/airflow/executors/kubernetes_executor.py @@ -490,7 +490,13 @@ class KubernetesExecutor(BaseExecutor, LoggingMixin): ) -> None: """Executes task asynchronously""" self.log.info('Add task %s with command %s with executor_config %s', key, command, executor_config) - kube_executor_config = PodGenerator.from_obj(executor_config) + try: + kube_executor_config = PodGenerator.from_obj(executor_config) + except Exception: # pylint: disable=broad-except + self.log.error("Invalid executor_config for %s", key) + self.fail(key=key, info="Invalid executor_config passed") + return + if executor_config: pod_template_file = executor_config.get("pod_template_override", None) else: diff --git a/tests/executors/test_kubernetes_executor.py b/tests/executors/test_kubernetes_executor.py index 9abb328..dc7cbbb 100644 --- a/tests/executors/test_kubernetes_executor.py +++ b/tests/executors/test_kubernetes_executor.py @@ -196,6 +196,26 @@ class TestKubernetesExecutor(unittest.TestCase): @mock.patch('airflow.executors.kubernetes_executor.KubernetesJobWatcher') @mock.patch('airflow.executors.kubernetes_executor.get_kube_client') + def test_invalid_executor_config(self, mock_get_kube_client, mock_kubernetes_job_watcher): + executor = self.kubernetes_executor + executor.start() + + assert executor.event_buffer == {} + executor.execute_async( + key=('dag', 'task', datetime.utcnow(), 1), + queue=None, + command=['airflow', 'tasks', 'run', 'true', 'some_parameter'], + executor_config=k8s.V1Pod( + spec=k8s.V1PodSpec( + containers=[k8s.V1Container(name="base", image="myimage", image_pull_policy="Always")] + ) + ), + ) + + assert list(executor.event_buffer.values())[0][1] == "Invalid executor_config passed" + + @mock.patch('airflow.executors.kubernetes_executor.KubernetesJobWatcher') + @mock.patch('airflow.executors.kubernetes_executor.get_kube_client') def test_change_state_running(self, mock_get_kube_client, mock_kubernetes_job_watcher): executor = self.kubernetes_executor executor.start()
