This is an automated email from the ASF dual-hosted git repository.

pierrejeambrun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 4619e9e0ca [AIP-51] Fix calculate_env for non local executor (#28897)
4619e9e0ca is described below

commit 4619e9e0ca1222356dce9b2868d89153a705d60a
Author: Pierre Jeambrun <pierrejb...@gmail.com>
AuthorDate: Fri Jan 13 00:29:22 2023 +0100

    [AIP-51] Fix calculate_env for non local executor (#28897)
    
    * Fix calculate_env for non local executor
    
    * Add test
---
 airflow/cli/commands/standalone_command.py    |  2 +-
 tests/cli/commands/test_standalone_command.py | 69 +++++++++++++++++++++++++++
 2 files changed, 70 insertions(+), 1 deletion(-)

diff --git a/airflow/cli/commands/standalone_command.py 
b/airflow/cli/commands/standalone_command.py
index 9da2a215c9..660e5cc6df 100644
--- a/airflow/cli/commands/standalone_command.py
+++ b/airflow/cli/commands/standalone_command.py
@@ -158,7 +158,7 @@ class StandaloneCommand:
 
         # Make sure we're using a local executor flavour
         executor_class, _ = ExecutorLoader.import_default_executor_cls()
-        if executor_class.is_local:
+        if not executor_class.is_local:
             if "sqlite" in conf.get("database", "sql_alchemy_conn"):
                 self.print_output("standalone", "Forcing executor to 
SequentialExecutor")
                 env["AIRFLOW__CORE__EXECUTOR"] = 
executor_constants.SEQUENTIAL_EXECUTOR
diff --git a/tests/cli/commands/test_standalone_command.py 
b/tests/cli/commands/test_standalone_command.py
new file mode 100644
index 0000000000..ec258c5041
--- /dev/null
+++ b/tests/cli/commands/test_standalone_command.py
@@ -0,0 +1,69 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from unittest import mock
+
+import pytest
+
+from airflow.cli.commands.standalone_command import StandaloneCommand
+from airflow.executors.executor_constants import (
+    CELERY_EXECUTOR,
+    CELERY_KUBERNETES_EXECUTOR,
+    DASK_EXECUTOR,
+    DEBUG_EXECUTOR,
+    KUBERNETES_EXECUTOR,
+    LOCAL_EXECUTOR,
+    LOCAL_KUBERNETES_EXECUTOR,
+    SEQUENTIAL_EXECUTOR,
+)
+
+
+class TestStandaloneCommand:
+    @pytest.mark.parametrize(
+        "conf_executor_name, conf_sql_alchemy_conn, 
expected_standalone_executor",
+        [
+            (LOCAL_EXECUTOR, "sqlite_conn_string", LOCAL_EXECUTOR),
+            (LOCAL_KUBERNETES_EXECUTOR, "sqlite_conn_string", 
SEQUENTIAL_EXECUTOR),
+            (SEQUENTIAL_EXECUTOR, "sqlite_conn_string", SEQUENTIAL_EXECUTOR),
+            (CELERY_EXECUTOR, "sqlite_conn_string", SEQUENTIAL_EXECUTOR),
+            (CELERY_KUBERNETES_EXECUTOR, "sqlite_conn_string", 
SEQUENTIAL_EXECUTOR),
+            (DASK_EXECUTOR, "sqlite_conn_string", SEQUENTIAL_EXECUTOR),
+            (KUBERNETES_EXECUTOR, "sqlite_conn_string", SEQUENTIAL_EXECUTOR),
+            (DEBUG_EXECUTOR, "sqlite_conn_string", SEQUENTIAL_EXECUTOR),
+            (LOCAL_EXECUTOR, "other_db_conn_string", LOCAL_EXECUTOR),
+            (LOCAL_KUBERNETES_EXECUTOR, "other_db_conn_string", 
LOCAL_EXECUTOR),
+            (SEQUENTIAL_EXECUTOR, "other_db_conn_string", SEQUENTIAL_EXECUTOR),
+            (CELERY_EXECUTOR, "other_db_conn_string", LOCAL_EXECUTOR),
+            (CELERY_KUBERNETES_EXECUTOR, "other_db_conn_string", 
LOCAL_EXECUTOR),
+            (DASK_EXECUTOR, "other_db_conn_string", LOCAL_EXECUTOR),
+            (KUBERNETES_EXECUTOR, "other_db_conn_string", LOCAL_EXECUTOR),
+            (DEBUG_EXECUTOR, "other_db_conn_string", LOCAL_EXECUTOR),
+        ],
+    )
+    def test_calculate_env(self, conf_executor_name, conf_sql_alchemy_conn, 
expected_standalone_executor):
+        """Should always force a local executor compatible with the db."""
+        with mock.patch.dict(
+            "os.environ",
+            {
+                "AIRFLOW__CORE__EXECUTOR": conf_executor_name,
+                "AIRFLOW__DATABASE__SQL_ALCHEMY_CONN": conf_sql_alchemy_conn,
+            },
+        ):
+            env = StandaloneCommand().calculate_env()
+            assert env["AIRFLOW__CORE__EXECUTOR"] == 
expected_standalone_executor

Reply via email to