This is an automated email from the ASF dual-hosted git repository.
onikolas pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 4b0a33e5797 Verify team existence on Execution loading (#55806)
4b0a33e5797 is described below
commit 4b0a33e579775968ed660ac458a36ebd3c8b1eed
Author: Niko Oliveira <[email protected]>
AuthorDate: Mon Sep 22 12:28:39 2025 -0700
Verify team existence on Execution loading (#55806)
If bad configuration was provided (user tried to configure an executor
for a team that does not exist) then throw an exception and do not allow
execution loading to complete (and Airflow Scheduler to start).
---
.../src/airflow/executors/executor_loader.py | 30 ++++++++++
airflow-core/src/airflow/models/team.py | 37 +++++++++++-
.../tests/unit/executors/test_executor_loader.py | 64 +++++++++++++++++++-
airflow-core/tests/unit/models/test_team.py | 70 ++++++++++++++++++++++
4 files changed, 197 insertions(+), 4 deletions(-)
diff --git a/airflow-core/src/airflow/executors/executor_loader.py
b/airflow-core/src/airflow/executors/executor_loader.py
index be5305c06ba..373f8938cf4 100644
--- a/airflow-core/src/airflow/executors/executor_loader.py
+++ b/airflow-core/src/airflow/executors/executor_loader.py
@@ -32,6 +32,7 @@ from airflow.executors.executor_constants import (
ConnectorSource,
)
from airflow.executors.executor_utils import ExecutorName
+from airflow.models.team import Team
from airflow.utils.module_loading import import_string
log = logging.getLogger(__name__)
@@ -153,6 +154,30 @@ class ExecutorLoader:
if not team_dev_mode or team_dev_mode != "enabled":
raise AirflowConfigException("Configuring multiple team based
executors is not yet supported!")
+ @classmethod
+ def _validate_teams_exist_in_database(cls, team_names: set[str]) -> None:
+ """
+ Validate that all specified team names exist in the database.
+
+ :param team_names: Set of team names to validate
+ :raises AirflowConfigException: If any team names don't exist in the
database
+ """
+ if not team_names:
+ return
+
+ existing_teams = Team.get_all_team_names()
+
+ missing_teams = team_names - existing_teams
+
+ if missing_teams:
+ missing_teams_list = sorted(missing_teams)
+ missing_teams_str = ", ".join(missing_teams_list)
+
+ raise AirflowConfigException(
+ f"One or more teams specified in executor configuration do not
exist in database: {missing_teams_str}. "
+ "Please create these teams first or remove them from executor
configuration."
+ )
+
@classmethod
def _get_team_executor_configs(cls) -> list[tuple[str | None, list[str]]]:
"""
@@ -208,6 +233,11 @@ class ExecutorLoader:
"'CeleryExecutor;team1=LocalExecutor' instead of
'team1=CeleryExecutor;team2=LocalExecutor')."
)
+ # Validate that all team names exist in the database (excluding None
for global configs)
+ team_names_to_validate = {team_name for team_name in seen_teams if
team_name is not None}
+ if team_names_to_validate:
+ cls._validate_teams_exist_in_database(team_names_to_validate)
+
return configs
@classmethod
diff --git a/airflow-core/src/airflow/models/team.py
b/airflow-core/src/airflow/models/team.py
index f25502d4cda..7609f3ff8b2 100644
--- a/airflow-core/src/airflow/models/team.py
+++ b/airflow-core/src/airflow/models/team.py
@@ -17,12 +17,18 @@
# under the License.
from __future__ import annotations
+from typing import TYPE_CHECKING
+
import uuid6
-from sqlalchemy import Column, ForeignKey, Index, String, Table
+from sqlalchemy import Column, ForeignKey, Index, String, Table, select
from sqlalchemy.orm import relationship
from sqlalchemy_utils import UUIDType
from airflow.models.base import Base
+from airflow.utils.session import NEW_SESSION, provide_session
+
+if TYPE_CHECKING:
+ from sqlalchemy.orm import Session
dag_bundle_team_association_table = Table(
"dag_bundle_team",
@@ -51,3 +57,32 @@ class Team(Base):
def __repr__(self):
return f"Team(id={self.id},name={self.name})"
+
+ @classmethod
+ @provide_session
+ def get_all_teams_id_to_name_mapping(cls, session: Session = NEW_SESSION)
-> dict[str, str]:
+ """
+ Return a mapping of all team IDs to team names from the database.
+
+ This method provides a reusable way to get team information that can
be used
+ across the codebase for validation and lookups.
+
+ :param session: Database session
+ :return: Dictionary mapping team UUIDs to team names
+ """
+ stmt = select(cls.id, cls.name)
+ teams = session.execute(stmt).all()
+ return {str(team_id): team_name for team_id, team_name in teams}
+
+ @classmethod
+ def get_all_team_names(cls) -> set[str]:
+ """
+ Return a set of all team names from the database.
+
+ This method provides a convenient way to get just the team names for
validation
+ purposes, such as verifying team names in executor configurations.
+
+ :return: Set of all team names
+ """
+ team_mapping = cls.get_all_teams_id_to_name_mapping()
+ return set(team_mapping.values())
diff --git a/airflow-core/tests/unit/executors/test_executor_loader.py
b/airflow-core/tests/unit/executors/test_executor_loader.py
index 1d19db86b4c..740ed70526c 100644
--- a/airflow-core/tests/unit/executors/test_executor_loader.py
+++ b/airflow-core/tests/unit/executors/test_executor_loader.py
@@ -235,7 +235,10 @@ class TestExecutorLoader:
)
def test_get_hybrid_executors_from_configs(self, executor_config,
expected_executors_list):
# Mock the blocking method for tests that involve actual team
configurations
- with mock.patch.object(executor_loader.ExecutorLoader,
"block_use_of_multi_team"):
+ with (
+ mock.patch.object(executor_loader.ExecutorLoader,
"block_use_of_multi_team"),
+ mock.patch.object(executor_loader.ExecutorLoader,
"_validate_teams_exist_in_database"),
+ ):
with conf_vars({("core", "executor"): executor_config}):
executors =
executor_loader.ExecutorLoader._get_executor_names()
assert executors == expected_executors_list
@@ -419,7 +422,10 @@ class TestExecutorLoader:
assert executor_loader._module_to_executors_per_team == {}
assert executor_loader._classname_to_executors_per_team == {}
assert executor_loader._team_name_to_executors == {}
- with mock.patch.object(executor_loader.ExecutorLoader,
"block_use_of_multi_team"):
+ with (
+ mock.patch.object(executor_loader.ExecutorLoader,
"block_use_of_multi_team"),
+ mock.patch.object(executor_loader.ExecutorLoader,
"_validate_teams_exist_in_database"),
+ ):
executor_loader.ExecutorLoader._get_executor_names()
assert executor_loader._executor_names == [
celery_global,
@@ -519,7 +525,10 @@ class TestExecutorLoader:
("team2", ["LocalExecutor"]),
]
- with mock.patch.object(executor_loader.ExecutorLoader,
"block_use_of_multi_team"):
+ with (
+ mock.patch.object(executor_loader.ExecutorLoader,
"block_use_of_multi_team"),
+ mock.patch.object(executor_loader.ExecutorLoader,
"_validate_teams_exist_in_database"),
+ ):
with conf_vars({("core", "executor"): executor_config}):
configs =
executor_loader.ExecutorLoader._get_team_executor_configs()
assert configs == expected_configs
@@ -567,3 +576,52 @@ class TestExecutorLoader:
AirflowConfigException, match=r"At least one global executor
must be configured"
):
executor_loader.ExecutorLoader._get_team_executor_configs()
+
+ def test_team_validation_with_valid_teams_in_config(self):
+ """Test that executor config with valid teams loads successfully."""
+ mock_team_names = {"team_a", "team_b"}
+
+ with (
+ patch.object(executor_loader.Team, "get_all_team_names",
return_value=mock_team_names),
+ mock.patch.object(executor_loader.ExecutorLoader,
"block_use_of_multi_team"),
+ ):
+ with conf_vars(
+ {("core", "executor"):
"=CeleryExecutor;team_a=CeleryExecutor;team_b=LocalExecutor"}
+ ):
+ configs =
executor_loader.ExecutorLoader._get_team_executor_configs()
+
+ assert len(configs) == 3
+ assert configs[0] == (None, ["CeleryExecutor"])
+ assert configs[1] == ("team_a", ["CeleryExecutor"])
+ assert configs[2] == ("team_b", ["LocalExecutor"])
+
+ def test_team_validation_with_invalid_teams_in_config(self):
+ """Test that executor config with invalid teams fails with clear
error."""
+ mock_team_names = {"team_a"} # team_b and team_c are missing
+
+ with (
+ patch.object(executor_loader.Team, "get_all_team_names",
return_value=mock_team_names),
+ mock.patch.object(executor_loader.ExecutorLoader,
"block_use_of_multi_team"),
+ ):
+ with conf_vars(
+ {
+ (
+ "core",
+ "executor",
+ ):
"=CeleryExecutor;team_a=CeleryExecutor;team_b=LocalExecutor;team_c=KubernetesExecutor"
+ }
+ ):
+ with pytest.raises(AirflowConfigException):
+ executor_loader.ExecutorLoader._get_team_executor_configs()
+
+ def test_team_validation_skips_global_teams(self):
+ """Test that team validation does not validate global teams."""
+ with patch.object(executor_loader.Team, "get_all_team_names") as
mock_get_team_names:
+ with conf_vars({("core", "executor"):
"CeleryExecutor,LocalExecutor"}):
+ configs =
executor_loader.ExecutorLoader._get_team_executor_configs()
+
+ assert len(configs) == 1
+ assert configs[0] == (None, ["CeleryExecutor",
"LocalExecutor"])
+
+ # No team validation should occur since only global teams are
configured
+ mock_get_team_names.assert_not_called()
diff --git a/airflow-core/tests/unit/models/test_team.py
b/airflow-core/tests/unit/models/test_team.py
new file mode 100644
index 00000000000..9c76e752286
--- /dev/null
+++ b/airflow-core/tests/unit/models/test_team.py
@@ -0,0 +1,70 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import uuid
+from unittest.mock import MagicMock, patch
+
+from airflow.models.team import Team
+
+
+class TestTeam:
+ """Unit tests for Team model class methods."""
+
+ def test_get_all_teams_id_to_name_mapping_with_teams(self):
+ """Test get_all_teams_id_to_name_mapping returns correct mapping when
teams exist."""
+ team1_id = str(uuid.uuid4())
+ team2_id = str(uuid.uuid4())
+ team3_id = str(uuid.uuid4())
+
+ mock_query_result = [
+ (team1_id, "team_alpha"),
+ (team2_id, "team_beta"),
+ (team3_id, "team_gamma"),
+ ]
+
+ # Create a mock session
+ mock_session = MagicMock()
+ mock_session.execute.return_value.all.return_value = mock_query_result
+
+ # Call the method directly with our mock session
+ result = Team.get_all_teams_id_to_name_mapping(session=mock_session)
+
+ expected = {
+ team1_id: "team_alpha",
+ team2_id: "team_beta",
+ team3_id: "team_gamma",
+ }
+ assert result == expected
+
+ # Verify the execute method was called correctly
+ mock_session.execute.assert_called_once()
+ mock_session.execute.return_value.all.assert_called_once()
+
+ def test_get_all_team_names_with_teams(self):
+ """Test get_all_team_names returns correct set of names when teams
exist."""
+ mock_mapping = {
+ "uuid-1": "team_alpha",
+ "uuid-2": "team_beta",
+ "uuid-3": "team_gamma",
+ }
+
+ with patch.object(Team, "get_all_teams_id_to_name_mapping",
return_value=mock_mapping):
+ result = Team.get_all_team_names()
+
+ assert result == {"team_alpha", "team_beta", "team_gamma"}
+ assert isinstance(result, set)