This is an automated email from the ASF dual-hosted git repository.

onikolas pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 4b0a33e5797 Verify team existence on Execution loading (#55806)
4b0a33e5797 is described below

commit 4b0a33e579775968ed660ac458a36ebd3c8b1eed
Author: Niko Oliveira <[email protected]>
AuthorDate: Mon Sep 22 12:28:39 2025 -0700

    Verify team existence on Execution loading (#55806)
    
    If bad configuration was provided (user tried to configure an executor
    for a team that does not exist) then throw an exception and do not allow
    execution loading to complete (and Airflow Scheduler to start).
---
 .../src/airflow/executors/executor_loader.py       | 30 ++++++++++
 airflow-core/src/airflow/models/team.py            | 37 +++++++++++-
 .../tests/unit/executors/test_executor_loader.py   | 64 +++++++++++++++++++-
 airflow-core/tests/unit/models/test_team.py        | 70 ++++++++++++++++++++++
 4 files changed, 197 insertions(+), 4 deletions(-)

diff --git a/airflow-core/src/airflow/executors/executor_loader.py 
b/airflow-core/src/airflow/executors/executor_loader.py
index be5305c06ba..373f8938cf4 100644
--- a/airflow-core/src/airflow/executors/executor_loader.py
+++ b/airflow-core/src/airflow/executors/executor_loader.py
@@ -32,6 +32,7 @@ from airflow.executors.executor_constants import (
     ConnectorSource,
 )
 from airflow.executors.executor_utils import ExecutorName
+from airflow.models.team import Team
 from airflow.utils.module_loading import import_string
 
 log = logging.getLogger(__name__)
@@ -153,6 +154,30 @@ class ExecutorLoader:
         if not team_dev_mode or team_dev_mode != "enabled":
             raise AirflowConfigException("Configuring multiple team based 
executors is not yet supported!")
 
+    @classmethod
+    def _validate_teams_exist_in_database(cls, team_names: set[str]) -> None:
+        """
+        Validate that all specified team names exist in the database.
+
+        :param team_names: Set of team names to validate
+        :raises AirflowConfigException: If any team names don't exist in the 
database
+        """
+        if not team_names:
+            return
+
+        existing_teams = Team.get_all_team_names()
+
+        missing_teams = team_names - existing_teams
+
+        if missing_teams:
+            missing_teams_list = sorted(missing_teams)
+            missing_teams_str = ", ".join(missing_teams_list)
+
+            raise AirflowConfigException(
+                f"One or more teams specified in executor configuration do not 
exist in database: {missing_teams_str}. "
+                "Please create these teams first or remove them from executor 
configuration."
+            )
+
     @classmethod
     def _get_team_executor_configs(cls) -> list[tuple[str | None, list[str]]]:
         """
@@ -208,6 +233,11 @@ class ExecutorLoader:
                 "'CeleryExecutor;team1=LocalExecutor' instead of 
'team1=CeleryExecutor;team2=LocalExecutor')."
             )
 
+        # Validate that all team names exist in the database (excluding None 
for global configs)
+        team_names_to_validate = {team_name for team_name in seen_teams if 
team_name is not None}
+        if team_names_to_validate:
+            cls._validate_teams_exist_in_database(team_names_to_validate)
+
         return configs
 
     @classmethod
diff --git a/airflow-core/src/airflow/models/team.py 
b/airflow-core/src/airflow/models/team.py
index f25502d4cda..7609f3ff8b2 100644
--- a/airflow-core/src/airflow/models/team.py
+++ b/airflow-core/src/airflow/models/team.py
@@ -17,12 +17,18 @@
 # under the License.
 from __future__ import annotations
 
+from typing import TYPE_CHECKING
+
 import uuid6
-from sqlalchemy import Column, ForeignKey, Index, String, Table
+from sqlalchemy import Column, ForeignKey, Index, String, Table, select
 from sqlalchemy.orm import relationship
 from sqlalchemy_utils import UUIDType
 
 from airflow.models.base import Base
+from airflow.utils.session import NEW_SESSION, provide_session
+
+if TYPE_CHECKING:
+    from sqlalchemy.orm import Session
 
 dag_bundle_team_association_table = Table(
     "dag_bundle_team",
@@ -51,3 +57,32 @@ class Team(Base):
 
     def __repr__(self):
         return f"Team(id={self.id},name={self.name})"
+
+    @classmethod
+    @provide_session
+    def get_all_teams_id_to_name_mapping(cls, session: Session = NEW_SESSION) 
-> dict[str, str]:
+        """
+        Return a mapping of all team IDs to team names from the database.
+
+        This method provides a reusable way to get team information that can 
be used
+        across the codebase for validation and lookups.
+
+        :param session: Database session
+        :return: Dictionary mapping team UUIDs to team names
+        """
+        stmt = select(cls.id, cls.name)
+        teams = session.execute(stmt).all()
+        return {str(team_id): team_name for team_id, team_name in teams}
+
+    @classmethod
+    def get_all_team_names(cls) -> set[str]:
+        """
+        Return a set of all team names from the database.
+
+        This method provides a convenient way to get just the team names for 
validation
+        purposes, such as verifying team names in executor configurations.
+
+        :return: Set of all team names
+        """
+        team_mapping = cls.get_all_teams_id_to_name_mapping()
+        return set(team_mapping.values())
diff --git a/airflow-core/tests/unit/executors/test_executor_loader.py 
b/airflow-core/tests/unit/executors/test_executor_loader.py
index 1d19db86b4c..740ed70526c 100644
--- a/airflow-core/tests/unit/executors/test_executor_loader.py
+++ b/airflow-core/tests/unit/executors/test_executor_loader.py
@@ -235,7 +235,10 @@ class TestExecutorLoader:
     )
     def test_get_hybrid_executors_from_configs(self, executor_config, 
expected_executors_list):
         # Mock the blocking method for tests that involve actual team 
configurations
-        with mock.patch.object(executor_loader.ExecutorLoader, 
"block_use_of_multi_team"):
+        with (
+            mock.patch.object(executor_loader.ExecutorLoader, 
"block_use_of_multi_team"),
+            mock.patch.object(executor_loader.ExecutorLoader, 
"_validate_teams_exist_in_database"),
+        ):
             with conf_vars({("core", "executor"): executor_config}):
                 executors = 
executor_loader.ExecutorLoader._get_executor_names()
                 assert executors == expected_executors_list
@@ -419,7 +422,10 @@ class TestExecutorLoader:
             assert executor_loader._module_to_executors_per_team == {}
             assert executor_loader._classname_to_executors_per_team == {}
             assert executor_loader._team_name_to_executors == {}
-            with mock.patch.object(executor_loader.ExecutorLoader, 
"block_use_of_multi_team"):
+            with (
+                mock.patch.object(executor_loader.ExecutorLoader, 
"block_use_of_multi_team"),
+                mock.patch.object(executor_loader.ExecutorLoader, 
"_validate_teams_exist_in_database"),
+            ):
                 executor_loader.ExecutorLoader._get_executor_names()
             assert executor_loader._executor_names == [
                 celery_global,
@@ -519,7 +525,10 @@ class TestExecutorLoader:
             ("team2", ["LocalExecutor"]),
         ]
 
-        with mock.patch.object(executor_loader.ExecutorLoader, 
"block_use_of_multi_team"):
+        with (
+            mock.patch.object(executor_loader.ExecutorLoader, 
"block_use_of_multi_team"),
+            mock.patch.object(executor_loader.ExecutorLoader, 
"_validate_teams_exist_in_database"),
+        ):
             with conf_vars({("core", "executor"): executor_config}):
                 configs = 
executor_loader.ExecutorLoader._get_team_executor_configs()
                 assert configs == expected_configs
@@ -567,3 +576,52 @@ class TestExecutorLoader:
                 AirflowConfigException, match=r"At least one global executor 
must be configured"
             ):
                 executor_loader.ExecutorLoader._get_team_executor_configs()
+
+    def test_team_validation_with_valid_teams_in_config(self):
+        """Test that executor config with valid teams loads successfully."""
+        mock_team_names = {"team_a", "team_b"}
+
+        with (
+            patch.object(executor_loader.Team, "get_all_team_names", 
return_value=mock_team_names),
+            mock.patch.object(executor_loader.ExecutorLoader, 
"block_use_of_multi_team"),
+        ):
+            with conf_vars(
+                {("core", "executor"): 
"=CeleryExecutor;team_a=CeleryExecutor;team_b=LocalExecutor"}
+            ):
+                configs = 
executor_loader.ExecutorLoader._get_team_executor_configs()
+
+                assert len(configs) == 3
+                assert configs[0] == (None, ["CeleryExecutor"])
+                assert configs[1] == ("team_a", ["CeleryExecutor"])
+                assert configs[2] == ("team_b", ["LocalExecutor"])
+
+    def test_team_validation_with_invalid_teams_in_config(self):
+        """Test that executor config with invalid teams fails with clear 
error."""
+        mock_team_names = {"team_a"}  # team_b and team_c are missing
+
+        with (
+            patch.object(executor_loader.Team, "get_all_team_names", 
return_value=mock_team_names),
+            mock.patch.object(executor_loader.ExecutorLoader, 
"block_use_of_multi_team"),
+        ):
+            with conf_vars(
+                {
+                    (
+                        "core",
+                        "executor",
+                    ): 
"=CeleryExecutor;team_a=CeleryExecutor;team_b=LocalExecutor;team_c=KubernetesExecutor"
+                }
+            ):
+                with pytest.raises(AirflowConfigException):
+                    executor_loader.ExecutorLoader._get_team_executor_configs()
+
+    def test_team_validation_skips_global_teams(self):
+        """Test that team validation does not validate global teams."""
+        with patch.object(executor_loader.Team, "get_all_team_names") as 
mock_get_team_names:
+            with conf_vars({("core", "executor"): 
"CeleryExecutor,LocalExecutor"}):
+                configs = 
executor_loader.ExecutorLoader._get_team_executor_configs()
+
+                assert len(configs) == 1
+                assert configs[0] == (None, ["CeleryExecutor", 
"LocalExecutor"])
+
+                # No team validation should occur since only global teams are 
configured
+                mock_get_team_names.assert_not_called()
diff --git a/airflow-core/tests/unit/models/test_team.py 
b/airflow-core/tests/unit/models/test_team.py
new file mode 100644
index 00000000000..9c76e752286
--- /dev/null
+++ b/airflow-core/tests/unit/models/test_team.py
@@ -0,0 +1,70 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import uuid
+from unittest.mock import MagicMock, patch
+
+from airflow.models.team import Team
+
+
+class TestTeam:
+    """Unit tests for Team model class methods."""
+
+    def test_get_all_teams_id_to_name_mapping_with_teams(self):
+        """Test get_all_teams_id_to_name_mapping returns correct mapping when 
teams exist."""
+        team1_id = str(uuid.uuid4())
+        team2_id = str(uuid.uuid4())
+        team3_id = str(uuid.uuid4())
+
+        mock_query_result = [
+            (team1_id, "team_alpha"),
+            (team2_id, "team_beta"),
+            (team3_id, "team_gamma"),
+        ]
+
+        # Create a mock session
+        mock_session = MagicMock()
+        mock_session.execute.return_value.all.return_value = mock_query_result
+
+        # Call the method directly with our mock session
+        result = Team.get_all_teams_id_to_name_mapping(session=mock_session)
+
+        expected = {
+            team1_id: "team_alpha",
+            team2_id: "team_beta",
+            team3_id: "team_gamma",
+        }
+        assert result == expected
+
+        # Verify the execute method was called correctly
+        mock_session.execute.assert_called_once()
+        mock_session.execute.return_value.all.assert_called_once()
+
+    def test_get_all_team_names_with_teams(self):
+        """Test get_all_team_names returns correct set of names when teams 
exist."""
+        mock_mapping = {
+            "uuid-1": "team_alpha",
+            "uuid-2": "team_beta",
+            "uuid-3": "team_gamma",
+        }
+
+        with patch.object(Team, "get_all_teams_id_to_name_mapping", 
return_value=mock_mapping):
+            result = Team.get_all_team_names()
+
+            assert result == {"team_alpha", "team_beta", "team_gamma"}
+            assert isinstance(result, set)

Reply via email to