This is an automated email from the ASF dual-hosted git repository. pierrejeambrun pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new 4619e9e0ca [AIP-51] Fix calculate_env for non local executor (#28897) 4619e9e0ca is described below commit 4619e9e0ca1222356dce9b2868d89153a705d60a Author: Pierre Jeambrun <pierrejb...@gmail.com> AuthorDate: Fri Jan 13 00:29:22 2023 +0100 [AIP-51] Fix calculate_env for non local executor (#28897) * Fix calculate_env for non local executor * Add test --- airflow/cli/commands/standalone_command.py | 2 +- tests/cli/commands/test_standalone_command.py | 69 +++++++++++++++++++++++++++ 2 files changed, 70 insertions(+), 1 deletion(-) diff --git a/airflow/cli/commands/standalone_command.py b/airflow/cli/commands/standalone_command.py index 9da2a215c9..660e5cc6df 100644 --- a/airflow/cli/commands/standalone_command.py +++ b/airflow/cli/commands/standalone_command.py @@ -158,7 +158,7 @@ class StandaloneCommand: # Make sure we're using a local executor flavour executor_class, _ = ExecutorLoader.import_default_executor_cls() - if executor_class.is_local: + if not executor_class.is_local: if "sqlite" in conf.get("database", "sql_alchemy_conn"): self.print_output("standalone", "Forcing executor to SequentialExecutor") env["AIRFLOW__CORE__EXECUTOR"] = executor_constants.SEQUENTIAL_EXECUTOR diff --git a/tests/cli/commands/test_standalone_command.py b/tests/cli/commands/test_standalone_command.py new file mode 100644 index 0000000000..ec258c5041 --- /dev/null +++ b/tests/cli/commands/test_standalone_command.py @@ -0,0 +1,69 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from unittest import mock + +import pytest + +from airflow.cli.commands.standalone_command import StandaloneCommand +from airflow.executors.executor_constants import ( + CELERY_EXECUTOR, + CELERY_KUBERNETES_EXECUTOR, + DASK_EXECUTOR, + DEBUG_EXECUTOR, + KUBERNETES_EXECUTOR, + LOCAL_EXECUTOR, + LOCAL_KUBERNETES_EXECUTOR, + SEQUENTIAL_EXECUTOR, +) + + +class TestStandaloneCommand: + @pytest.mark.parametrize( + "conf_executor_name, conf_sql_alchemy_conn, expected_standalone_executor", + [ + (LOCAL_EXECUTOR, "sqlite_conn_string", LOCAL_EXECUTOR), + (LOCAL_KUBERNETES_EXECUTOR, "sqlite_conn_string", SEQUENTIAL_EXECUTOR), + (SEQUENTIAL_EXECUTOR, "sqlite_conn_string", SEQUENTIAL_EXECUTOR), + (CELERY_EXECUTOR, "sqlite_conn_string", SEQUENTIAL_EXECUTOR), + (CELERY_KUBERNETES_EXECUTOR, "sqlite_conn_string", SEQUENTIAL_EXECUTOR), + (DASK_EXECUTOR, "sqlite_conn_string", SEQUENTIAL_EXECUTOR), + (KUBERNETES_EXECUTOR, "sqlite_conn_string", SEQUENTIAL_EXECUTOR), + (DEBUG_EXECUTOR, "sqlite_conn_string", SEQUENTIAL_EXECUTOR), + (LOCAL_EXECUTOR, "other_db_conn_string", LOCAL_EXECUTOR), + (LOCAL_KUBERNETES_EXECUTOR, "other_db_conn_string", LOCAL_EXECUTOR), + (SEQUENTIAL_EXECUTOR, "other_db_conn_string", SEQUENTIAL_EXECUTOR), + (CELERY_EXECUTOR, "other_db_conn_string", LOCAL_EXECUTOR), + (CELERY_KUBERNETES_EXECUTOR, "other_db_conn_string", LOCAL_EXECUTOR), + (DASK_EXECUTOR, "other_db_conn_string", LOCAL_EXECUTOR), + (KUBERNETES_EXECUTOR, "other_db_conn_string", LOCAL_EXECUTOR), + (DEBUG_EXECUTOR, "other_db_conn_string", LOCAL_EXECUTOR), + ], + ) + def test_calculate_env(self, conf_executor_name, conf_sql_alchemy_conn, expected_standalone_executor): + """Should always force a local executor compatible with the db.""" + with mock.patch.dict( + "os.environ", + { + "AIRFLOW__CORE__EXECUTOR": conf_executor_name, + "AIRFLOW__DATABASE__SQL_ALCHEMY_CONN": conf_sql_alchemy_conn, + }, + ): + env = StandaloneCommand().calculate_env() + assert env["AIRFLOW__CORE__EXECUTOR"] == expected_standalone_executor