This is an automated email from the ASF dual-hosted git repository.

ash pushed a commit to branch v2-0-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 6dd75596eb619dfa5e398153fe83b52368af1748
Author: Kaxil Naik <[email protected]>
AuthorDate: Sat Feb 20 00:46:39 2021 +0000

    Scheduler should not fail when invalid executor_config is passed (#14323)
    
    closes #14182
    
    (cherry picked from commit e0ee91e15f8385e34e3d7dfc8a6365e350ea7083)
---
 airflow/executors/kubernetes_executor.py    |  8 +++++++-
 tests/executors/test_kubernetes_executor.py | 20 ++++++++++++++++++++
 2 files changed, 27 insertions(+), 1 deletion(-)

diff --git a/airflow/executors/kubernetes_executor.py 
b/airflow/executors/kubernetes_executor.py
index 88e26be..fd5c6fa 100644
--- a/airflow/executors/kubernetes_executor.py
+++ b/airflow/executors/kubernetes_executor.py
@@ -490,7 +490,13 @@ class KubernetesExecutor(BaseExecutor, LoggingMixin):
     ) -> None:
         """Executes task asynchronously"""
         self.log.info('Add task %s with command %s with executor_config %s', 
key, command, executor_config)
-        kube_executor_config = PodGenerator.from_obj(executor_config)
+        try:
+            kube_executor_config = PodGenerator.from_obj(executor_config)
+        except Exception:  # pylint: disable=broad-except
+            self.log.error("Invalid executor_config for %s", key)
+            self.fail(key=key, info="Invalid executor_config passed")
+            return
+
         if executor_config:
             pod_template_file = executor_config.get("pod_template_override", 
None)
         else:
diff --git a/tests/executors/test_kubernetes_executor.py 
b/tests/executors/test_kubernetes_executor.py
index 9abb328..dc7cbbb 100644
--- a/tests/executors/test_kubernetes_executor.py
+++ b/tests/executors/test_kubernetes_executor.py
@@ -196,6 +196,26 @@ class TestKubernetesExecutor(unittest.TestCase):
 
     @mock.patch('airflow.executors.kubernetes_executor.KubernetesJobWatcher')
     @mock.patch('airflow.executors.kubernetes_executor.get_kube_client')
+    def test_invalid_executor_config(self, mock_get_kube_client, 
mock_kubernetes_job_watcher):
+        executor = self.kubernetes_executor
+        executor.start()
+
+        assert executor.event_buffer == {}
+        executor.execute_async(
+            key=('dag', 'task', datetime.utcnow(), 1),
+            queue=None,
+            command=['airflow', 'tasks', 'run', 'true', 'some_parameter'],
+            executor_config=k8s.V1Pod(
+                spec=k8s.V1PodSpec(
+                    containers=[k8s.V1Container(name="base", image="myimage", 
image_pull_policy="Always")]
+                )
+            ),
+        )
+
+        assert list(executor.event_buffer.values())[0][1] == "Invalid 
executor_config passed"
+
+    @mock.patch('airflow.executors.kubernetes_executor.KubernetesJobWatcher')
+    @mock.patch('airflow.executors.kubernetes_executor.get_kube_client')
     def test_change_state_running(self, mock_get_kube_client, 
mock_kubernetes_job_watcher):
         executor = self.kubernetes_executor
         executor.start()

Reply via email to