This is an automated email from the ASF dual-hosted git repository.

o-nikolas pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 10e5ee47f42 Add --team-name CLI argument to triggerer for multi-team 
deployments (#67254)
10e5ee47f42 is described below

commit 10e5ee47f42732eeb6b0b30413a9354b00f2d6f3
Author: Ramit Kataria <[email protected]>
AuthorDate: Mon May 25 15:33:57 2026 -0700

    Add --team-name CLI argument to triggerer for multi-team deployments 
(#67254)
    
    When core.multi_team is enabled, a triggerer can now be scoped to a
    specific team via `airflow triggerer --team-name <name>`. The argument
    is validated at startup (team must exist, config must be enabled) and
    threaded through TriggererJobRunner → TriggerRunnerSupervisor. Query
    filtering will be wired in a follow-up PR.
---
 airflow-core/src/airflow/cli/cli_config.py         |  5 +++
 .../src/airflow/cli/commands/triggerer_command.py  | 28 ++++++++++--
 .../src/airflow/jobs/triggerer_job_runner.py       |  4 ++
 .../unit/cli/commands/test_triggerer_command.py    | 51 +++++++++++++++++++++-
 airflow-core/tests/unit/jobs/test_triggerer_job.py | 44 +++++++++++++++++++
 5 files changed, 126 insertions(+), 6 deletions(-)

diff --git a/airflow-core/src/airflow/cli/cli_config.py 
b/airflow-core/src/airflow/cli/cli_config.py
index b41ac481e0e..8e701db79bb 100644
--- a/airflow-core/src/airflow/cli/cli_config.py
+++ b/airflow-core/src/airflow/cli/cli_config.py
@@ -1025,6 +1025,10 @@ ARG_QUEUES = Arg(
     type=string_list_type,
     help="Optional comma-separated list of task queues which the triggerer 
should consume from.",
 )
+ARG_TRIGGERER_TEAM_NAME = Arg(
+    ("--team-name",),
+    help="Team name to scope this triggerer to. Requires core.multi_team to be 
enabled.",
+)
 
 ARG_DAG_LIST_COLUMNS = Arg(
     ("--columns",),
@@ -2140,6 +2144,7 @@ core_commands: list[CLICommand] = [
             ARG_SKIP_SERVE_LOGS,
             ARG_DEV,
             ARG_QUEUES,
+            ARG_TRIGGERER_TEAM_NAME,
         ),
     ),
     ActionCommand(
diff --git a/airflow-core/src/airflow/cli/commands/triggerer_command.py 
b/airflow-core/src/airflow/cli/commands/triggerer_command.py
index f35332abd1f..1182fbb05b2 100644
--- a/airflow-core/src/airflow/cli/commands/triggerer_command.py
+++ b/airflow-core/src/airflow/cli/commands/triggerer_command.py
@@ -52,11 +52,15 @@ def _serve_logs(skip_serve_logs: bool = False) -> 
Generator[None, None, None]:
 
 @enable_memray_trace(component=MemrayTraceComponents.triggerer)
 def triggerer_run(
-    skip_serve_logs: bool, capacity: int, triggerer_heartrate: float, queues: 
set[str] | None = None
+    skip_serve_logs: bool,
+    capacity: int,
+    triggerer_heartrate: float,
+    queues: set[str] | None = None,
+    team_name: str | None = None,
 ):
     with _serve_logs(skip_serve_logs):
         triggerer_job_runner = TriggererJobRunner(
-            job=Job(heartrate=triggerer_heartrate), capacity=capacity, 
queues=queues
+            job=Job(heartrate=triggerer_heartrate), capacity=capacity, 
queues=queues, team_name=team_name
         )
         run_job(job=triggerer_job_runner.job, 
execute_callable=triggerer_job_runner._execute)
 
@@ -75,6 +79,18 @@ def triggerer(args):
             "--queues option may only be used when triggerer.queues_enabled is 
`True`."
         )
 
+    multi_team = conf.getboolean("core", "multi_team")
+    team_name: str | None = getattr(args, "team_name", None)
+
+    if team_name and not multi_team:
+        raise AirflowConfigException("--team-name option may only be used when 
core.multi_team is enabled.")
+
+    if team_name:
+        from airflow.models.team import Team
+
+        if Team.get_name_if_exists(team_name) is None:
+            raise AirflowConfigException(f"Team {team_name!r} does not exist.")
+
     queues = set(args.queues) if args.queues else None
     triggerer_heartrate = conf.getfloat("triggerer", "JOB_HEARTBEAT_SEC")
 
@@ -82,7 +98,9 @@ def triggerer(args):
         from airflow.cli.hot_reload import run_with_reloader
 
         run_with_reloader(
-            lambda: triggerer_run(args.skip_serve_logs, args.capacity, 
triggerer_heartrate, queues),
+            lambda: triggerer_run(
+                args.skip_serve_logs, args.capacity, triggerer_heartrate, 
queues, team_name
+            ),
             process_name="triggerer",
         )
         return
@@ -90,6 +108,8 @@ def triggerer(args):
     run_command_with_daemon_option(
         args=args,
         process_name="triggerer",
-        callback=lambda: triggerer_run(args.skip_serve_logs, args.capacity, 
triggerer_heartrate, queues),
+        callback=lambda: triggerer_run(
+            args.skip_serve_logs, args.capacity, triggerer_heartrate, queues, 
team_name
+        ),
         should_setup_logging=True,
     )
diff --git a/airflow-core/src/airflow/jobs/triggerer_job_runner.py 
b/airflow-core/src/airflow/jobs/triggerer_job_runner.py
index c73dfd857bd..14c522d9c29 100644
--- a/airflow-core/src/airflow/jobs/triggerer_job_runner.py
+++ b/airflow-core/src/airflow/jobs/triggerer_job_runner.py
@@ -182,6 +182,7 @@ class TriggererJobRunner(BaseJobRunner, LoggingMixin):
         job: Job,
         capacity=None,
         queues: set[str] | None = None,
+        team_name: str | None = None,
     ):
         super().__init__(job)
         if capacity is None:
@@ -191,6 +192,7 @@ class TriggererJobRunner(BaseJobRunner, LoggingMixin):
         else:
             raise ValueError(f"Capacity number {capacity!r} is invalid")
         self.queues = queues
+        self.team_name = team_name
 
     def register_signals(self) -> None:
         """Register signals that stop child processes."""
@@ -241,6 +243,7 @@ class TriggererJobRunner(BaseJobRunner, LoggingMixin):
                 capacity=self.capacity,
                 logger=log,
                 queues=self.queues,
+                team_name=self.team_name,
             )
 
             # Run the main DB comms loop in this process
@@ -428,6 +431,7 @@ class TriggerRunnerSupervisor(WatchedSubprocess):
     job: Job | None = None
     capacity: int
     queues: set[str] | None = None
+    team_name: str | None = None
 
     health_check_threshold = conf.getint("triggerer", 
"triggerer_health_check_threshold")
     runner_health_check_threshold = conf.getfloat("triggerer", 
"runner_health_check_threshold")
diff --git a/airflow-core/tests/unit/cli/commands/test_triggerer_command.py 
b/airflow-core/tests/unit/cli/commands/test_triggerer_command.py
index fb8f6b347d3..0cc9a10b52a 100644
--- a/airflow-core/tests/unit/cli/commands/test_triggerer_command.py
+++ b/airflow-core/tests/unit/cli/commands/test_triggerer_command.py
@@ -51,7 +51,9 @@ class TestTriggererCommand:
         triggerer_command.triggerer(args)
         mock_serve.return_value.__enter__.assert_called_once()
         mock_serve.return_value.__exit__.assert_called_once()
-        mock_triggerer_job_runner.assert_called_once_with(job=mock.ANY, 
capacity=42, queues=None)
+        mock_triggerer_job_runner.assert_called_once_with(
+            job=mock.ANY, capacity=42, queues=None, team_name=None
+        )
 
     @conf_vars({("triggerer", "queues_enabled"): "True"})
     @mock.patch("airflow.cli.commands.triggerer_command.TriggererJobRunner")
@@ -64,7 +66,7 @@ class TestTriggererCommand:
         mock_serve.return_value.__enter__.assert_called_once()
         mock_serve.return_value.__exit__.assert_called_once()
         mock_triggerer_job_runner.assert_called_once_with(
-            job=mock.ANY, capacity=4, queues=set(["my_queue", "other_queue"])
+            job=mock.ANY, capacity=4, queues=set(["my_queue", "other_queue"]), 
team_name=None
         )
 
     @mock.patch("airflow.cli.commands.triggerer_command.TriggererJobRunner")
@@ -92,3 +94,48 @@ class TestTriggererCommand:
         mock_reloader.assert_called_once()
         # The callback function should be callable
         assert callable(mock_reloader.call_args[0][0])
+
+    @conf_vars({("core", "multi_team"): "False"})
+    def test_team_name_rejected_when_multi_team_disabled(self):
+        """--team-name should raise when core.multi_team is disabled"""
+        from airflow.exceptions import AirflowConfigException
+
+        args = self.parser.parse_args(["triggerer", "--team-name", "team_a"])
+        with pytest.raises(AirflowConfigException, 
match="--team-name.*core.multi_team"):
+            triggerer_command.triggerer(args)
+
+    @conf_vars({("core", "multi_team"): "True"})
+    @mock.patch("airflow.models.team.Team.get_name_if_exists", 
return_value=None)
+    def test_team_name_rejected_when_team_does_not_exist(self, mock_get_team):
+        """--team-name should raise when the specified team doesn't exist in 
DB"""
+        from airflow.exceptions import AirflowConfigException
+
+        args = self.parser.parse_args(["triggerer", "--team-name", 
"nonexistent_team"])
+        with pytest.raises(AirflowConfigException, match="does not exist"):
+            triggerer_command.triggerer(args)
+        mock_get_team.assert_called_once_with("nonexistent_team")
+
+    @conf_vars({("core", "multi_team"): "True"})
+    @mock.patch("airflow.models.team.Team.get_name_if_exists", 
return_value="team_a")
+    @mock.patch("airflow.cli.commands.triggerer_command.TriggererJobRunner")
+    @mock.patch("airflow.cli.commands.triggerer_command._serve_logs")
+    def test_team_name_passed_through(self, mock_serve, 
mock_triggerer_job_runner, mock_get_team):
+        """--team-name should be passed to TriggererJobRunner when valid"""
+        mock_triggerer_job_runner.return_value.job_type = "TriggererJob"
+        args = self.parser.parse_args(["triggerer", "--team-name", "team_a"])
+        triggerer_command.triggerer(args)
+        mock_triggerer_job_runner.assert_called_once_with(
+            job=mock.ANY, capacity=mock.ANY, queues=None, team_name="team_a"
+        )
+
+    @conf_vars({("core", "multi_team"): "False"})
+    @mock.patch("airflow.cli.commands.triggerer_command.TriggererJobRunner")
+    @mock.patch("airflow.cli.commands.triggerer_command._serve_logs")
+    def test_no_team_name_passes_none(self, mock_serve, 
mock_triggerer_job_runner):
+        """Without --team-name, team_name=None is passed"""
+        mock_triggerer_job_runner.return_value.job_type = "TriggererJob"
+        args = self.parser.parse_args(["triggerer"])
+        triggerer_command.triggerer(args)
+        mock_triggerer_job_runner.assert_called_once_with(
+            job=mock.ANY, capacity=mock.ANY, queues=None, team_name=None
+        )
diff --git a/airflow-core/tests/unit/jobs/test_triggerer_job.py 
b/airflow-core/tests/unit/jobs/test_triggerer_job.py
index 73643ab3867..3d3a399c121 100644
--- a/airflow-core/tests/unit/jobs/test_triggerer_job.py
+++ b/airflow-core/tests/unit/jobs/test_triggerer_job.py
@@ -200,6 +200,14 @@ def test_capacity_decode():
             TriggererJobRunner(job=job, capacity=input_str)
 
 
[email protected]("team_name", ["team_a", None])
+def test_triggerer_job_runner_stores_team_name(team_name):
+    """TriggererJobRunner stores team_name as-is (validated at CLI layer)."""
+    job = Job()
+    runner = TriggererJobRunner(job, capacity=10, team_name=team_name)
+    assert runner.team_name == team_name
+
+
 @pytest.fixture
 def supervisor_builder(mocker, session):
     def builder(job=None):
@@ -236,6 +244,42 @@ def supervisor_builder(mocker, session):
     return builder
 
 
+def test_supervisor_stores_team_name(supervisor_builder, mocker, session):
+    """TriggerRunnerSupervisor stores team_name field."""
+    job = Job()
+    session.add(job)
+    session.flush()
+
+    import psutil
+
+    process = mocker.Mock(spec=psutil.Process, pid=99)
+    mock_stdin = mocker.Mock(spec=socket)
+
+    proc = TriggerRunnerSupervisor(
+        process_log=mocker.Mock(spec=FilteringBoundLogger),
+        id=job.id,
+        job=job,
+        pid=process.pid,
+        stdin=mock_stdin,
+        process=process,
+        capacity=10,
+        team_name="team_x",
+    )
+    assert proc.team_name == "team_x"
+
+    proc_global = TriggerRunnerSupervisor(
+        process_log=mocker.Mock(spec=FilteringBoundLogger),
+        id=job.id,
+        job=job,
+        pid=process.pid,
+        stdin=mock_stdin,
+        process=process,
+        capacity=10,
+        team_name=None,
+    )
+    assert proc_global.team_name is None
+
+
 def test_run_invokes_seams_in_order(supervisor_builder, mocker):
     """run() enters run_context, drives run_once while not should_stop, then 
exits run_context."""
     from contextlib import contextmanager

Reply via email to