This is an automated email from the ASF dual-hosted git repository.
onikolas pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 2f04eb2a122 CLI commands for multi team (#55283)
2f04eb2a122 is described below
commit 2f04eb2a12299887166a212aca8999523ff6f9d3
Author: Niko Oliveira <[email protected]>
AuthorDate: Wed Sep 10 15:24:26 2025 -0700
CLI commands for multi team (#55283)
CLI commands for Create, List, Delete of teams. These are delivered as
CLI commands instead of API because they are administrator only (similar
to `airflow db reset`). These are not commands that DAG authors should
be regularly running (therefore no APIs implemented)
---
airflow-core/src/airflow/cli/cli_config.py | 27 ++
.../src/airflow/cli/commands/team_command.py | 152 +++++++++
airflow-core/src/airflow/models/team.py | 3 +-
.../tests/unit/cli/commands/test_team_command.py | 357 +++++++++++++++++++++
4 files changed, 538 insertions(+), 1 deletion(-)
diff --git a/airflow-core/src/airflow/cli/cli_config.py
b/airflow-core/src/airflow/cli/cli_config.py
index 5c8497ead6e..31bfc6b90f0 100644
--- a/airflow-core/src/airflow/cli/cli_config.py
+++ b/airflow-core/src/airflow/cli/cli_config.py
@@ -305,6 +305,8 @@ ARG_VERBOSE = Arg(("-v", "--verbose"), help="Make logging
output more verbose",
ARG_LOCAL = Arg(("-l", "--local"), help="Run the task using the
LocalExecutor", action="store_true")
ARG_POOL = Arg(("--pool",), "Resource pool to use")
+# teams
+ARG_TEAM_NAME = Arg(("name",), help="Team name")
# backfill
ARG_BACKFILL_DAG = Arg(flags=("--dag-id",), help="The dag to backfill.",
required=True)
@@ -1391,6 +1393,26 @@ VARIABLES_COMMANDS = (
args=(ARG_VAR_EXPORT, ARG_VERBOSE),
),
)
+TEAMS_COMMANDS = (
+ ActionCommand(
+ name="create",
+ help="Create a team",
+
func=lazy_load_command("airflow.cli.commands.team_command.team_create"),
+ args=(ARG_TEAM_NAME, ARG_VERBOSE),
+ ),
+ ActionCommand(
+ name="delete",
+ help="Delete a team",
+
func=lazy_load_command("airflow.cli.commands.team_command.team_delete"),
+ args=(ARG_TEAM_NAME, ARG_YES, ARG_VERBOSE),
+ ),
+ ActionCommand(
+ name="list",
+ help="List teams",
+ func=lazy_load_command("airflow.cli.commands.team_command.team_list"),
+ args=(ARG_OUTPUT, ARG_VERBOSE),
+ ),
+)
DB_COMMANDS = (
ActionCommand(
name="check-migrations",
@@ -1835,6 +1857,11 @@ core_commands: list[CLICommand] = [
help="Manage variables",
subcommands=VARIABLES_COMMANDS,
),
+ GroupCommand(
+ name="teams",
+ help="Manage teams",
+ subcommands=TEAMS_COMMANDS,
+ ),
GroupCommand(
name="jobs",
help="Manage jobs",
diff --git a/airflow-core/src/airflow/cli/commands/team_command.py
b/airflow-core/src/airflow/cli/commands/team_command.py
new file mode 100644
index 00000000000..fd07f5e3bf6
--- /dev/null
+++ b/airflow-core/src/airflow/cli/commands/team_command.py
@@ -0,0 +1,152 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Teams sub-commands."""
+
+from __future__ import annotations
+
+from sqlalchemy import func, select
+from sqlalchemy.exc import IntegrityError
+
+from airflow.cli.simple_table import AirflowConsole
+from airflow.models.connection import Connection
+from airflow.models.pool import Pool
+from airflow.models.team import Team, dag_bundle_team_association_table
+from airflow.models.variable import Variable
+from airflow.utils import cli as cli_utils
+from airflow.utils.providers_configuration_loader import
providers_configuration_loaded
+from airflow.utils.session import NEW_SESSION, provide_session
+
+NO_TEAMS_LIST_MSG = "No teams found."
+
+
+def _show_teams(teams, output):
+ """Display teams in the specified output format."""
+ AirflowConsole().print_as(
+ data=teams,
+ output=output,
+ mapper=lambda x: {
+ "id": str(x.id),
+ "name": x.name,
+ },
+ )
+
+
+def _extract_team_name(args):
+ """Extract and validate team name from args."""
+ team_name = args.name.strip()
+ if not team_name:
+ raise SystemExit("Team name cannot be empty")
+ return team_name
+
+
+@cli_utils.action_cli
+@providers_configuration_loaded
+@provide_session
+def team_create(args, session=NEW_SESSION):
+ """Create a new team."""
+ team_name = _extract_team_name(args)
+
+ # Check if team with this name already exists
+ if session.scalar(select(Team).where(Team.name == team_name)):
+ raise SystemExit(f"Team with name '{team_name}' already exists")
+
+ # Create new team (UUID will be auto-generated by the database)
+ new_team = Team(name=team_name)
+
+ try:
+ session.add(new_team)
+ session.commit()
+ print(f"Team '{team_name}' created successfully with ID:
{new_team.id}")
+ except IntegrityError as e:
+ session.rollback()
+ raise SystemExit(f"Failed to create team '{team_name}': {e}")
+
+
+@cli_utils.action_cli
+@providers_configuration_loaded
+@provide_session
+def team_delete(args, session=NEW_SESSION):
+ """Delete a team after checking for associations."""
+ team_name = _extract_team_name(args)
+
+ # Find the team
+ team = session.scalar(select(Team).where(Team.name == team_name))
+ if not team:
+ raise SystemExit(f"Team '{team_name}' does not exist")
+
+ # Check for associations
+ associations = []
+
+ # Check DAG bundle associations
+ dag_bundle_count = session.scalar(
+ select(func.count())
+ .select_from(dag_bundle_team_association_table)
+ .where(dag_bundle_team_association_table.c.team_id == team.id)
+ )
+ if dag_bundle_count:
+ associations.append(f"{dag_bundle_count} DAG bundle(s)")
+
+ # Check connection associations
+ if connection_count := session.scalar(
+ select(func.count(Connection.id)).where(Connection.team_id == team.id)
+ ):
+ associations.append(f"{connection_count} connection(s)")
+
+ # Check variable associations
+ if variable_count :=
session.scalar(select(func.count(Variable.id)).where(Variable.team_id ==
team.id)):
+ associations.append(f"{variable_count} variable(s)")
+
+ # Check pool associations
+ if pool_count :=
session.scalar(select(func.count(Pool.id)).where(Pool.team_id == team.id)):
+ associations.append(f"{pool_count} pool(s)")
+
+ # If there are associations, prevent deletion
+ if associations:
+ association_list = ", ".join(associations)
+ raise SystemExit(
+ f"Cannot delete team '{team_name}' because it is associated with:
{association_list}. "
+ f"Please remove these associations first."
+ )
+
+ # Confirm deletion if not using --yes flag
+ if not args.yes:
+ confirmation = input(f"Are you sure you want to delete team
'{team_name}'? (y/N): ")
+ if confirmation.upper() != "Y":
+ print("Team deletion cancelled")
+ return
+
+ # Delete the team
+ try:
+ session.delete(team)
+ session.commit()
+ print(f"Team '{team_name}' deleted successfully")
+ except Exception as e:
+ session.rollback()
+ raise SystemExit(f"Failed to delete team '{team_name}': {e}")
+
+
+@cli_utils.action_cli
+@providers_configuration_loaded
+@provide_session
+def team_list(args, session=NEW_SESSION):
+ """List all teams."""
+ teams = session.scalars(select(Team).order_by(Team.name)).all()
+ if not teams:
+ print(NO_TEAMS_LIST_MSG)
+ else:
+ _show_teams(teams=teams, output=args.output)
diff --git a/airflow-core/src/airflow/models/team.py
b/airflow-core/src/airflow/models/team.py
index b591903cf20..f25502d4cda 100644
--- a/airflow-core/src/airflow/models/team.py
+++ b/airflow-core/src/airflow/models/team.py
@@ -17,6 +17,7 @@
# under the License.
from __future__ import annotations
+import uuid6
from sqlalchemy import Column, ForeignKey, Index, String, Table
from sqlalchemy.orm import relationship
from sqlalchemy_utils import UUIDType
@@ -42,7 +43,7 @@ class Team(Base):
__tablename__ = "team"
- id = Column(UUIDType(binary=False), primary_key=True, nullable=False)
+ id = Column(UUIDType(binary=False), primary_key=True, default=uuid6.uuid7)
name = Column(String(50), unique=True, nullable=False)
dag_bundles = relationship(
"DagBundleModel", secondary=dag_bundle_team_association_table,
back_populates="teams"
diff --git a/airflow-core/tests/unit/cli/commands/test_team_command.py
b/airflow-core/tests/unit/cli/commands/test_team_command.py
new file mode 100644
index 00000000000..a06a974136a
--- /dev/null
+++ b/airflow-core/tests/unit/cli/commands/test_team_command.py
@@ -0,0 +1,357 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import uuid
+from unittest.mock import patch
+
+import pytest
+
+from airflow import models, settings
+from airflow.cli import cli_parser
+from airflow.cli.commands import team_command
+from airflow.models import Connection, Pool, Variable
+from airflow.models.dagbundle import DagBundleModel
+from airflow.models.team import Team, dag_bundle_team_association_table
+from airflow.settings import Session
+
+from tests_common.test_utils.db import (
+ clear_db_connections,
+ clear_db_dag_bundles,
+ clear_db_pools,
+ clear_db_teams,
+ clear_db_variables,
+)
+
+pytestmark = pytest.mark.db_test
+
+
+class TestCliTeams:
+ @classmethod
+ def _cleanup(cls):
+ clear_db_connections(add_default_connections_back=False)
+ clear_db_variables()
+ clear_db_pools()
+ clear_db_dag_bundles()
+ clear_db_teams()
+
+ @classmethod
+ def setup_class(cls):
+ cls.dagbag = models.DagBag(include_examples=True)
+ cls.parser = cli_parser.get_parser()
+ settings.configure_orm()
+ cls.session = Session
+ cls._cleanup()
+
+ def teardown_method(self):
+ """Called after each test method."""
+ self._cleanup()
+
+ def test_team_create_success(self, stdout_capture):
+ """Test successful team creation."""
+ with stdout_capture as stdout:
+ team_command.team_create(self.parser.parse_args(["teams",
"create", "test-team"]))
+
+ # Verify team was created in database
+ team = self.session.query(Team).filter(Team.name ==
"test-team").first()
+ assert team is not None
+ assert team.name == "test-team"
+ assert isinstance(team.id, uuid.UUID)
+
+ # Verify output message
+ output = stdout.getvalue()
+ assert "Team 'test-team' created successfully" in output
+ assert str(team.id) in output
+
+ def test_team_create_empty_name(self):
+ """Test team creation with empty name."""
+ with pytest.raises(SystemExit, match="Team name cannot be empty"):
+ team_command.team_create(self.parser.parse_args(["teams",
"create", ""]))
+
+ def test_team_create_whitespace_name(self):
+ """Test team creation with whitespace-only name."""
+ with pytest.raises(SystemExit, match="Team name cannot be empty"):
+ team_command.team_create(self.parser.parse_args(["teams",
"create", " "]))
+
+ def test_team_create_duplicate_name(self):
+ """Test team creation with duplicate name."""
+ # Create first team
+ team_command.team_create(self.parser.parse_args(["teams", "create",
"duplicate-team"]))
+
+ # Try to create team with same name
+ with pytest.raises(SystemExit, match="Team with name 'duplicate-team'
already exists"):
+ team_command.team_create(self.parser.parse_args(["teams",
"create", "duplicate-team"]))
+
+ def test_team_list_empty(self, stdout_capture):
+ """Test listing teams when none exist."""
+ with stdout_capture as stdout:
+ team_command.team_list(self.parser.parse_args(["teams", "list"]))
+
+ # Should not error, just show empty result
+ output = stdout.getvalue()
+ # The exact output format depends on the AirflowConsole implementation
+ # but it should not contain any team names
+ assert team_command.NO_TEAMS_LIST_MSG in output
+
+ def test_team_list_with_teams(self, stdout_capture):
+ """Test listing teams when teams exist."""
+ # Create test teams
+ team_command.team_create(self.parser.parse_args(["teams", "create",
"team-alpha"]))
+ team_command.team_create(self.parser.parse_args(["teams", "create",
"team-beta"]))
+
+ with stdout_capture as stdout:
+ team_command.team_list(self.parser.parse_args(["teams", "list"]))
+
+ output = stdout.getvalue()
+ assert "team-alpha" in output
+ assert "team-beta" in output
+
+ def test_team_list_with_output_format(self):
+ """Test listing teams with different output formats."""
+ # Create a test team
+ team_command.team_create(self.parser.parse_args(["teams", "create",
"format-test"]))
+
+ # Test different output formats
+ team_command.team_list(self.parser.parse_args(["teams", "list",
"--output", "json"]))
+ team_command.team_list(self.parser.parse_args(["teams", "list",
"--output", "yaml"]))
+ team_command.team_list(self.parser.parse_args(["teams", "list",
"--output", "plain"]))
+
+ def test_team_delete_success(self, stdout_capture):
+ """Test successful team deletion."""
+ # Create team first
+ team_command.team_create(self.parser.parse_args(["teams", "create",
"delete-me"]))
+
+ # Verify team exists
+ team = self.session.query(Team).filter(Team.name ==
"delete-me").first()
+ assert team is not None
+
+ # Delete team with --yes flag
+ with stdout_capture as stdout:
+ team_command.team_delete(self.parser.parse_args(["teams",
"delete", "delete-me", "--yes"]))
+
+ # Verify team was deleted
+ team = self.session.query(Team).filter(Team.name ==
"delete-me").first()
+ assert team is None
+
+ # Verify output message
+ output = stdout.getvalue()
+ assert "Team 'delete-me' deleted successfully" in output
+
+ def test_team_delete_nonexistent(self):
+ """Test deleting a team that doesn't exist."""
+ with pytest.raises(SystemExit, match="Team 'nonexistent' does not
exist"):
+ team_command.team_delete(self.parser.parse_args(["teams",
"delete", "nonexistent", "--yes"]))
+
+ def test_team_delete_empty_name(self):
+ """Test deleting team with empty name."""
+ with pytest.raises(SystemExit, match="Team name cannot be empty"):
+ team_command.team_delete(self.parser.parse_args(["teams",
"delete", "", "--yes"]))
+
+ def test_team_delete_with_dag_bundle_association(self):
+ """Test deleting team that has DAG bundle associations."""
+ # Create team
+ team_command.team_create(self.parser.parse_args(["teams", "create",
"bundle-team"]))
+ team = self.session.query(Team).filter(Team.name ==
"bundle-team").first()
+
+ # Create a DAG bundle first
+ dag_bundle = DagBundleModel(name="test-bundle")
+ self.session.add(dag_bundle)
+ self.session.commit()
+
+ # Create a DAG bundle association
+ self.session.execute(
+
dag_bundle_team_association_table.insert().values(dag_bundle_name="test-bundle",
team_id=team.id)
+ )
+ self.session.commit()
+
+ # Try to delete team
+ with pytest.raises(
+ SystemExit,
+ match="Cannot delete team 'bundle-team' because it is associated
with: 1 DAG bundle\\(s\\)",
+ ):
+ team_command.team_delete(self.parser.parse_args(["teams",
"delete", "bundle-team", "--yes"]))
+
+ def test_team_delete_with_connection_association(self):
+ """Test deleting team that has connection associations."""
+ # Create team
+ team_command.team_create(self.parser.parse_args(["teams", "create",
"conn-team"]))
+ team = self.session.query(Team).filter(Team.name ==
"conn-team").first()
+
+ # Create connection associated with team
+ conn = Connection(conn_id="test-conn", conn_type="http",
team_id=team.id)
+ self.session.add(conn)
+ self.session.commit()
+
+ # Try to delete team
+ with pytest.raises(
+ SystemExit,
+ match="Cannot delete team 'conn-team' because it is associated
with: 1 connection\\(s\\)",
+ ):
+ team_command.team_delete(self.parser.parse_args(["teams",
"delete", "conn-team", "--yes"]))
+
+ def test_team_delete_with_variable_association(self):
+ """Test deleting team that has variable associations."""
+ # Create team
+ team_command.team_create(self.parser.parse_args(["teams", "create",
"var-team"]))
+ team = self.session.query(Team).filter(Team.name == "var-team").first()
+
+ # Create variable associated with team
+ var = Variable(key="test-var", val="test-value", team_id=team.id)
+ self.session.add(var)
+ self.session.commit()
+
+ # Try to delete team
+ with pytest.raises(
+ SystemExit, match="Cannot delete team 'var-team' because it is
associated with: 1 variable\\(s\\)"
+ ):
+ team_command.team_delete(self.parser.parse_args(["teams",
"delete", "var-team", "--yes"]))
+
+ def test_team_delete_with_pool_association(self):
+ """Test deleting team that has pool associations."""
+ # Create team
+ team_command.team_create(self.parser.parse_args(["teams", "create",
"pool-team"]))
+ team = self.session.query(Team).filter(Team.name ==
"pool-team").first()
+
+ # Create pool associated with team
+ pool = Pool(
+ pool="test-pool", slots=5, description="Test pool",
include_deferred=False, team_id=team.id
+ )
+ self.session.add(pool)
+ self.session.commit()
+
+ # Try to delete team
+ with pytest.raises(
+ SystemExit, match="Cannot delete team 'pool-team' because it is
associated with: 1 pool\\(s\\)"
+ ):
+ team_command.team_delete(self.parser.parse_args(["teams",
"delete", "pool-team", "--yes"]))
+
+ def test_team_delete_with_multiple_associations(self):
+ """Test deleting team that has multiple types of associations."""
+ # Create team
+ team_command.team_create(self.parser.parse_args(["teams", "create",
"multi-team"]))
+ team = self.session.query(Team).filter(Team.name ==
"multi-team").first()
+
+ # Create a DAG bundle first
+ dag_bundle = DagBundleModel(name="multi-bundle")
+ self.session.add(dag_bundle)
+ self.session.commit()
+
+ # Create multiple associations
+ conn = Connection(conn_id="multi-conn", conn_type="http",
team_id=team.id)
+ var = Variable(key="multi-var", val="value", team_id=team.id)
+ pool = Pool(
+ pool="multi-pool", slots=3, description="Multi pool",
include_deferred=False, team_id=team.id
+ )
+
+ self.session.add_all([conn, var, pool])
+ self.session.execute(
+
dag_bundle_team_association_table.insert().values(dag_bundle_name="multi-bundle",
team_id=team.id)
+ )
+ self.session.commit()
+
+ # Try to delete team
+ with pytest.raises(SystemExit) as exc_info:
+ team_command.team_delete(self.parser.parse_args(["teams",
"delete", "multi-team", "--yes"]))
+
+ error_msg = str(exc_info.value)
+ assert "Cannot delete team 'multi-team' because it is associated
with:" in error_msg
+ assert "1 DAG bundle(s)" in error_msg
+ assert "1 connection(s)" in error_msg
+ assert "1 variable(s)" in error_msg
+ assert "1 pool(s)" in error_msg
+
+ @patch("builtins.input", return_value="Y")
+ def test_team_delete_with_confirmation_yes(self, mock_input,
stdout_capture):
+ """Test team deletion with user confirmation (Yes)."""
+ # Create team
+ team_command.team_create(self.parser.parse_args(["teams", "create",
"confirm-yes"]))
+
+ # Delete without --yes flag (should prompt for confirmation)
+ with stdout_capture as stdout:
+ team_command.team_delete(self.parser.parse_args(["teams",
"delete", "confirm-yes"]))
+
+ # Verify team was deleted
+ team = self.session.query(Team).filter(Team.name ==
"confirm-yes").first()
+ assert team is None
+
+ output = stdout.getvalue()
+ assert "Team 'confirm-yes' deleted successfully" in output
+
+ @patch("builtins.input", return_value="N")
+ def test_team_delete_with_confirmation_no(self, mock_input,
stdout_capture):
+ """Test team deletion with user confirmation (No)."""
+ # Create team
+ team_command.team_create(self.parser.parse_args(["teams", "create",
"confirm-no"]))
+
+ # Delete without --yes flag (should prompt for confirmation)
+ with stdout_capture as stdout:
+ team_command.team_delete(self.parser.parse_args(["teams",
"delete", "confirm-no"]))
+
+ # Verify team was NOT deleted
+ team = self.session.query(Team).filter(Team.name ==
"confirm-no").first()
+ assert team is not None
+
+ output = stdout.getvalue()
+ assert "Team deletion cancelled" in output
+
+ @patch("builtins.input", return_value="invalid")
+ def test_team_delete_with_confirmation_invalid(self, mock_input,
stdout_capture):
+ """Test team deletion with invalid confirmation input."""
+ # Create team
+ team_command.team_create(self.parser.parse_args(["teams", "create",
"confirm-invalid"]))
+
+ # Delete without --yes flag (should prompt for confirmation)
+ with stdout_capture as stdout:
+ team_command.team_delete(self.parser.parse_args(["teams",
"delete", "confirm-invalid"]))
+
+ # Verify team was NOT deleted (invalid input treated as No)
+ team = self.session.query(Team).filter(Team.name ==
"confirm-invalid").first()
+ assert team is not None
+
+ output = stdout.getvalue()
+ assert "Team deletion cancelled" in output
+
+ def test_team_operations_integration(self):
+ """Test integration of create, list, and delete operations."""
+ # Start with empty state
+ teams = self.session.query(Team).all()
+ assert len(teams) == 0
+
+ # Create multiple teams
+ team_command.team_create(self.parser.parse_args(["teams", "create",
"integration-1"]))
+ team_command.team_create(self.parser.parse_args(["teams", "create",
"integration-2"]))
+ team_command.team_create(self.parser.parse_args(["teams", "create",
"integration-3"]))
+
+ # Verify all teams exist
+ teams = self.session.query(Team).all()
+ assert len(teams) == 3
+ team_names = [team.name for team in teams]
+ assert "integration-1" in team_names
+ assert "integration-2" in team_names
+ assert "integration-3" in team_names
+
+ # Delete one team
+ team_command.team_delete(self.parser.parse_args(["teams", "delete",
"integration-2", "--yes"]))
+
+ # Verify correct team was deleted
+ teams = self.session.query(Team).all()
+ assert len(teams) == 2
+ team_names = [team.name for team in teams]
+ assert "integration-1" in team_names
+ assert "integration-2" not in team_names
+ assert "integration-3" in team_names