This is an automated email from the ASF dual-hosted git repository.
onikolas pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 87b279b301c Update executor loader cache (#55469)
87b279b301c is described below
commit 87b279b301ccd05afe7a86082e4b6a60bebea5b4
Author: Niko Oliveira <[email protected]>
AuthorDate: Fri Sep 12 09:52:04 2025 -0700
Update executor loader cache (#55469)
The executor loader caches the values it reads from config in several
ways to make lookups very fast. Now with teams, we may have multiple
instances of the same executor so the caches must now be per-team.
Also disallow having more than one list of executors for each team.
Add tests to cover more of these edge cases and to harden test coverage
of the multi-team usecase in general.
---
.../src/airflow/executors/executor_loader.py | 63 +++++---
airflow-core/tests/integration/otel/test_otel.py | 4 +-
.../tests/unit/executors/test_executor_loader.py | 165 +++++++++++++++++++++
.../src/tests_common/test_utils/executor_loader.py | 15 +-
4 files changed, 221 insertions(+), 26 deletions(-)
diff --git a/airflow-core/src/airflow/executors/executor_loader.py
b/airflow-core/src/airflow/executors/executor_loader.py
index 49c40e94389..e13a0b80f3f 100644
--- a/airflow-core/src/airflow/executors/executor_loader.py
+++ b/airflow-core/src/airflow/executors/executor_loader.py
@@ -20,6 +20,7 @@ from __future__ import annotations
import logging
import os
+from collections import defaultdict
from typing import TYPE_CHECKING
from airflow.exceptions import AirflowConfigException, UnknownExecutorException
@@ -41,11 +42,11 @@ if TYPE_CHECKING:
# Used to lookup an ExecutorName via a string alias or module path. An
# executor may have both so we need two lookup dicts.
-_alias_to_executors: dict[str, ExecutorName] = {}
-_module_to_executors: dict[str, ExecutorName] = {}
+_alias_to_executors_per_team: dict[str | None, dict[str, ExecutorName]] =
defaultdict(dict)
+_module_to_executors_per_team: dict[str | None, dict[str, ExecutorName]] =
defaultdict(dict)
+_classname_to_executors_per_team: dict[str | None, dict[str, ExecutorName]] =
defaultdict(dict)
# Used to lookup an ExecutorName via the team id.
-_team_name_to_executors: dict[str | None, ExecutorName] = {}
-_classname_to_executors: dict[str, ExecutorName] = {}
+_team_name_to_executors: dict[str | None, list[ExecutorName]] =
defaultdict(list)
# Used to cache the computed ExecutorNames so that we don't need to read/parse
config more than once
_executor_names: list[ExecutorName] = []
@@ -125,13 +126,14 @@ class ExecutorLoader:
for executor_name in executor_names:
# Executors will not always have aliases
if executor_name.alias:
- _alias_to_executors[executor_name.alias] = executor_name
- # All executors will have a team id. It _may_ be None, for now
that means it is a system
- # level executor
- _team_name_to_executors[executor_name.team_name] = executor_name
+
_alias_to_executors_per_team[executor_name.team_name][executor_name.alias] =
executor_name
+ # All executors will have a team name. It _may_ be None, for now
that means it is a system level executor
+
_team_name_to_executors[executor_name.team_name].append(executor_name)
# All executors will have a module path
- _module_to_executors[executor_name.module_path] = executor_name
- _classname_to_executors[executor_name.module_path.split(".")[-1]]
= executor_name
+
_module_to_executors_per_team[executor_name.team_name][executor_name.module_path]
= executor_name
+ _classname_to_executors_per_team[executor_name.team_name][
+ executor_name.module_path.split(".")[-1]
+ ] = executor_name
# Cache the executor names, so the logic of this method only runs
once
_executor_names.append(executor_name)
@@ -167,6 +169,8 @@ class ExecutorLoader:
"The 'executor' key in the 'core' section of the configuration
is mandatory and cannot be empty"
)
configs: list[tuple[str | None, list[str]]] = []
+ seen_teams: set[str | None] = set()
+
# The executor_config can look like a few things. One is just a single
executor name, such as
# "CeleryExecutor". Or a list of executors, such as
"CeleryExecutor,KubernetesExecutor,module.path.to.executor".
# In these cases these are all executors that are available to all
teams, with the first one being the
@@ -178,13 +182,23 @@ class ExecutorLoader:
# The first item in the list may not have a team id (either empty
string before the equal
# sign or no equal sign at all), which means it is a global
executor config.
if "=" not in team_executor_config or
team_executor_config.startswith("="):
- team_executor_config = team_executor_config.strip("=")
- # Split by comma to get the individual executor names and
strip spaces off of them
- configs.append((None, [name.strip() for name in
team_executor_config.split(",")]))
+ team_name = None
+ executor_names = team_executor_config.strip("=")
else:
cls.block_use_of_multi_team()
team_name, executor_names = team_executor_config.split("=")
- configs.append((team_name, [name.strip() for name in
executor_names.split(",")]))
+
+ # Check for duplicate team names
+ if team_name in seen_teams:
+ raise AirflowConfigException(
+ f"Team '{team_name}' appears more than once in executor
configuration. "
+ f"Each team can only be specified once in the executor
config."
+ )
+ seen_teams.add(team_name)
+
+ # Split by comma to get the individual executor names and strip
spaces off of them
+ configs.append((team_name, [name.strip() for name in
executor_names.split(",")]))
+
return configs
@classmethod
@@ -197,14 +211,15 @@ class ExecutorLoader:
return cls._get_executor_names()
@classmethod
- def get_default_executor_name(cls) -> ExecutorName:
+ def get_default_executor_name(cls, team_name: str | None = None) ->
ExecutorName:
"""
Return the default executor name from Airflow configuration.
:return: executor name from Airflow configuration
"""
+ cls._get_executor_names()
# The default executor is the first configured executor in the list
- return cls._get_executor_names()[0]
+ return _team_name_to_executors[team_name][0]
@classmethod
def get_default_executor(cls) -> BaseExecutor:
@@ -230,17 +245,23 @@ class ExecutorLoader:
return loaded_executors
@classmethod
- def lookup_executor_name_by_str(cls, executor_name_str: str) ->
ExecutorName:
+ def lookup_executor_name_by_str(
+ cls, executor_name_str: str, team_name: str | None = None
+ ) -> ExecutorName:
# lookup the executor by alias first, if not check if we're given a
module path
- if not _classname_to_executors or not _module_to_executors or not
_alias_to_executors:
+ if (
+ not _classname_to_executors_per_team
+ or not _module_to_executors_per_team
+ or not _alias_to_executors_per_team
+ ):
# if we haven't loaded the executors yet, such as directly calling
load_executor
cls._get_executor_names()
- if executor_name := _alias_to_executors.get(executor_name_str):
+ if executor_name := _alias_to_executors_per_team.get(team_name,
{}).get(executor_name_str):
return executor_name
- if executor_name := _module_to_executors.get(executor_name_str):
+ if executor_name := _module_to_executors_per_team.get(team_name,
{}).get(executor_name_str):
return executor_name
- if executor_name := _classname_to_executors.get(executor_name_str):
+ if executor_name := _classname_to_executors_per_team.get(team_name,
{}).get(executor_name_str):
return executor_name
raise UnknownExecutorException(f"Unknown executor being loaded:
{executor_name_str}")
diff --git a/airflow-core/tests/integration/otel/test_otel.py
b/airflow-core/tests/integration/otel/test_otel.py
index e218820b292..4dd9874f736 100644
--- a/airflow-core/tests/integration/otel/test_otel.py
+++ b/airflow-core/tests/integration/otel/test_otel.py
@@ -709,7 +709,9 @@ class TestOtelIntegration:
module_path="airflow.providers.celery.executors.celery_executor.CeleryExecutor",
alias="CeleryExecutor",
)
- monkeypatch.setattr(executor_loader, "_alias_to_executors",
{"CeleryExecutor": executor_name})
+ monkeypatch.setattr(
+ executor_loader, "_alias_to_executors_per_team", {None:
{"CeleryExecutor": executor_name}}
+ )
@pytest.fixture(autouse=True)
def cleanup_control_file_if_needed(self):
diff --git a/airflow-core/tests/unit/executors/test_executor_loader.py
b/airflow-core/tests/unit/executors/test_executor_loader.py
index d3495752880..30fb90d0138 100644
--- a/airflow-core/tests/unit/executors/test_executor_loader.py
+++ b/airflow-core/tests/unit/executors/test_executor_loader.py
@@ -395,3 +395,168 @@ class TestExecutorLoader:
executor_loader.ExecutorLoader.load_executor(executor_loader._executor_names[0]),
AwsEcsExecutor,
)
+
+ def test_get_executor_names_set_module_variables(self):
+ with conf_vars(
+ {
+ (
+ "core",
+ "executor",
+ ):
"=CeleryExecutor,LocalExecutor,fake_exec:unit.executors.test_executor_loader.FakeExecutor;team_a=CeleryExecutor,unit.executors.test_executor_loader.FakeExecutor;team_b=fake_exec:unit.executors.test_executor_loader.FakeExecutor"
+ }
+ ):
+ celery_path =
"airflow.providers.celery.executors.celery_executor.CeleryExecutor"
+ local_path = "airflow.executors.local_executor.LocalExecutor"
+ fake_exec_path = "unit.executors.test_executor_loader.FakeExecutor"
+ celery_global = ExecutorName(module_path=celery_path,
alias="CeleryExecutor", team_name=None)
+ local_global = ExecutorName(module_path=local_path,
alias="LocalExecutor", team_name=None)
+ fake_global = ExecutorName(module_path=fake_exec_path,
alias="fake_exec", team_name=None)
+ team_a_celery = ExecutorName(
+ module_path=celery_path,
+ alias="CeleryExecutor",
+ team_name="team_a",
+ )
+ team_a_fake = ExecutorName(
+ module_path=fake_exec_path,
+ team_name="team_a",
+ )
+ team_b_fake = ExecutorName(
+ module_path=fake_exec_path,
+ alias="fake_exec",
+ team_name="team_b",
+ )
+ assert executor_loader._executor_names == []
+ assert executor_loader._alias_to_executors_per_team == {}
+ assert executor_loader._module_to_executors_per_team == {}
+ assert executor_loader._classname_to_executors_per_team == {}
+ assert executor_loader._team_name_to_executors == {}
+ with mock.patch.object(executor_loader.ExecutorLoader,
"block_use_of_multi_team"):
+ executor_loader.ExecutorLoader._get_executor_names()
+ assert executor_loader._executor_names == [
+ celery_global,
+ local_global,
+ fake_global,
+ team_a_celery,
+ team_a_fake,
+ team_b_fake,
+ ]
+ assert executor_loader._alias_to_executors_per_team == {
+ None: {
+ "CeleryExecutor": celery_global,
+ "LocalExecutor": local_global,
+ "fake_exec": fake_global,
+ },
+ "team_a": {"CeleryExecutor": team_a_celery},
+ "team_b": {"fake_exec": team_b_fake},
+ }
+ assert executor_loader._module_to_executors_per_team == {
+ None: {
+ celery_path: celery_global,
+ local_path: local_global,
+ fake_exec_path: fake_global,
+ },
+ "team_a": {
+ celery_path: team_a_celery,
+ fake_exec_path: team_a_fake,
+ },
+ "team_b": {
+ fake_exec_path: team_b_fake,
+ },
+ }
+ assert executor_loader._classname_to_executors_per_team == {
+ None: {
+ "CeleryExecutor": celery_global,
+ "LocalExecutor": local_global,
+ "FakeExecutor": fake_global,
+ },
+ "team_a": {
+ "CeleryExecutor": team_a_celery,
+ "FakeExecutor": team_a_fake,
+ },
+ "team_b": {
+ "FakeExecutor": team_b_fake,
+ },
+ }
+ assert executor_loader._team_name_to_executors == {
+ None: [celery_global, local_global, fake_global],
+ "team_a": [team_a_celery, team_a_fake],
+ "team_b": [team_b_fake],
+ }
+
+ @pytest.mark.parametrize(
+ "executor_config",
+ [
+ "team1=CeleryExecutor;team1=LocalExecutor",
+
"team1=CeleryExecutor;team2=LocalExecutor;team1=KubernetesExecutor",
+ "CeleryExecutor;team1=LocalExecutor;team1=KubernetesExecutor",
+
"team_a=CeleryExecutor;team_b=LocalExecutor;team_a=KubernetesExecutor",
+ ],
+ )
+ def test_duplicate_team_names_should_fail(self, executor_config):
+ """Test that duplicate team names in executor configuration raise an
exception."""
+ with mock.patch.object(executor_loader.ExecutorLoader,
"block_use_of_multi_team"):
+ with conf_vars({("core", "executor"): executor_config}):
+ with pytest.raises(
+ AirflowConfigException,
+ match=r"Team '.+' appears more than once in executor
configuration",
+ ):
+ executor_loader.ExecutorLoader._get_team_executor_configs()
+
+ @pytest.mark.parametrize(
+ "executor_config",
+ [
+ "CeleryExecutor;LocalExecutor", # Two separate global teams
+ "CeleryExecutor;KubernetesExecutor;LocalExecutor", # Three
separate global teams
+ "=CeleryExecutor;LocalExecutor", # Explicit global team followed
by another global team
+ "CeleryExecutor;=LocalExecutor", # Global team followed by
explicit global team
+ ],
+ )
+ def test_multiple_global_team_specifications_should_fail(self,
executor_config):
+ """Test that multiple global team specifications raise an exception.
+
+ Only one global team specification should be allowed (comma-delimited
executors),
+ not multiple semicolon-separated global teams.
+ """
+ with conf_vars({("core", "executor"): executor_config}):
+ with pytest.raises(
+ AirflowConfigException, match=r"Team 'None' appears more than
once in executor configuration"
+ ):
+ executor_loader.ExecutorLoader._get_team_executor_configs()
+
+ def test_valid_team_configurations_order_preservation(self):
+ """Test that valid team configurations preserve order and work
correctly."""
+ executor_config =
"LocalExecutor;team1=CeleryExecutor,KubernetesExecutor;team2=LocalExecutor"
+ expected_configs = [
+ (None, ["LocalExecutor"]),
+ ("team1", ["CeleryExecutor", "KubernetesExecutor"]),
+ ("team2", ["LocalExecutor"]),
+ ]
+
+ with mock.patch.object(executor_loader.ExecutorLoader,
"block_use_of_multi_team"):
+ with conf_vars({("core", "executor"): executor_config}):
+ configs =
executor_loader.ExecutorLoader._get_team_executor_configs()
+ assert configs == expected_configs
+
+ @pytest.mark.parametrize(
+ ("executor_config", "expected_configs"),
+ [
+ # Single global team with one executor
+ ("CeleryExecutor", [(None, ["CeleryExecutor"])]),
+ # Single global team with multiple comma-delimited executors
+ ("CeleryExecutor,LocalExecutor", [(None, ["CeleryExecutor",
"LocalExecutor"])]),
+ (
+ "CeleryExecutor,LocalExecutor,KubernetesExecutor",
+ [(None, ["CeleryExecutor", "LocalExecutor",
"KubernetesExecutor"])],
+ ),
+ # Single global team with explicit = prefix
+ ("=CeleryExecutor,LocalExecutor", [(None, ["CeleryExecutor",
"LocalExecutor"])]),
+ ],
+ )
+ def test_single_global_team_configurations_work(self, executor_config,
expected_configs):
+ """Test that single global team configurations work correctly.
+
+ A single global team can have multiple executors specified as
comma-delimited list.
+ """
+ with conf_vars({("core", "executor"): executor_config}):
+ configs =
executor_loader.ExecutorLoader._get_team_executor_configs()
+ assert configs == expected_configs
diff --git a/devel-common/src/tests_common/test_utils/executor_loader.py
b/devel-common/src/tests_common/test_utils/executor_loader.py
index 6374b381e1d..69466ba1103 100644
--- a/devel-common/src/tests_common/test_utils/executor_loader.py
+++ b/devel-common/src/tests_common/test_utils/executor_loader.py
@@ -17,6 +17,7 @@
# under the License.
from __future__ import annotations
+from collections import defaultdict
from typing import TYPE_CHECKING
import airflow.executors.executor_loader as executor_loader
@@ -27,8 +28,14 @@ if TYPE_CHECKING:
def clean_executor_loader_module():
"""Clean the executor_loader state, as it stores global variables in the
module, causing side effects for some tests."""
- executor_loader._alias_to_executors: dict[str, ExecutorName] = {}
- executor_loader._module_to_executors: dict[str, ExecutorName] = {}
- executor_loader._team_name_to_executors: dict[str | None, ExecutorName] =
{}
- executor_loader._classname_to_executors: dict[str, ExecutorName] = {}
+ executor_loader._alias_to_executors_per_team: dict[str | None, dict[str,
ExecutorName]] = defaultdict(
+ dict
+ )
+ executor_loader._module_to_executors_per_team: dict[str | None, dict[str,
ExecutorName]] = defaultdict(
+ dict
+ )
+ executor_loader._classname_to_executors_per_team: dict[str | None,
dict[str, ExecutorName]] = defaultdict(
+ dict
+ )
+ executor_loader._team_name_to_executors: dict[str | None,
list[ExecutorName]] = defaultdict(list)
executor_loader._executor_names: list[ExecutorName] = []