This is an automated email from the ASF dual-hosted git repository.
ferruzzi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new a5684bf8d03 Check destination team permission when using bulk APIs
with connections, variables and pools (#68573)
a5684bf8d03 is described below
commit a5684bf8d03c311b91df543393692bbf367536c5
Author: Vincent <[email protected]>
AuthorDate: Mon Jun 15 12:23:24 2026 -0400
Check destination team permission when using bulk APIs with connections,
variables and pools (#68573)
---
.../src/airflow/api_fastapi/core_api/security.py | 55 +++++++++---
.../unit/api_fastapi/core_api/test_security.py | 99 ++++++++++++++++++++++
2 files changed, 142 insertions(+), 12 deletions(-)
diff --git a/airflow-core/src/airflow/api_fastapi/core_api/security.py
b/airflow-core/src/airflow/api_fastapi/core_api/security.py
index 8a009231f12..76f29220309 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/security.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/security.py
@@ -441,6 +441,7 @@ def requires_access_pool_bulk() ->
Callable[[BulkBody[PoolBody], BaseUser], None
request: BulkBody[PoolBody],
user: GetUserDep,
) -> None:
+ multi_team = conf.getboolean("core", "multi_team")
# Build the list of pool names provided as part of the request that
may correspond to
# an existing resource (UPDATE / DELETE, or CREATE+OVERWRITE which may
turn into a PUT).
existing_pool_names = [
@@ -459,9 +460,6 @@ def requires_access_pool_bulk() ->
Callable[[BulkBody[PoolBody], BaseUser], None
pool_name = (
cast("str", pool) if action.action == BulkAction.DELETE
else cast("PoolBody", pool).pool
)
- # For each pool, build a `IsAuthorizedPoolRequest`
- # The list of `IsAuthorizedPoolRequest` will then be sent
using `batch_is_authorized_pool`
- # Each `IsAuthorizedPoolRequest` is similar to calling
`is_authorized_pool`
for method in methods:
req: IsAuthorizedPoolRequest = {
"method": method,
@@ -471,9 +469,19 @@ def requires_access_pool_bulk() ->
Callable[[BulkBody[PoolBody], BaseUser], None
),
}
requests.append(req)
+ # Authorize the destination team_name when the entity body
requests a team change.
+ if multi_team and _bulk_action_sets_team(action):
+ dest_team = cast("PoolBody", pool).team_name
+ if dest_team is not None and dest_team !=
pool_name_to_team.get(pool_name):
+ for method in methods:
+ requests.append(
+ {
+ "method": method,
+ "details": PoolDetails(name=pool_name,
team_name=dest_team),
+ }
+ )
_requires_access(
- # By calling `batch_is_authorized_pool`, we check the user has
access to all pools provided in the request
is_authorized_callback=lambda:
get_auth_manager().batch_is_authorized_pool(
requests=requests,
user=user,
@@ -543,6 +551,7 @@ def requires_access_connection_bulk() ->
Callable[[BulkBody[ConnectionBody], Bas
request: BulkBody[ConnectionBody],
user: GetUserDep,
) -> None:
+ multi_team = conf.getboolean("core", "multi_team")
# Build the list of ``conn_id`` provided as part of the request that
may correspond to
# an existing resource (UPDATE / DELETE, or CREATE+OVERWRITE which may
turn into a PUT).
existing_connection_ids = [
@@ -565,9 +574,6 @@ def requires_access_connection_bulk() ->
Callable[[BulkBody[ConnectionBody], Bas
if action.action == BulkAction.DELETE
else cast("ConnectionBody", connection).connection_id
)
- # For each pool, build a `IsAuthorizedConnectionRequest`
- # The list of `IsAuthorizedConnectionRequest` will then be
sent using `batch_is_authorized_connection`
- # Each `IsAuthorizedConnectionRequest` is similar to calling
`is_authorized_connection`
for method in methods:
req: IsAuthorizedConnectionRequest = {
"method": method,
@@ -577,9 +583,19 @@ def requires_access_connection_bulk() ->
Callable[[BulkBody[ConnectionBody], Bas
),
}
requests.append(req)
+ # Authorize the destination team_name when the entity body
requests a team change.
+ if multi_team and _bulk_action_sets_team(action):
+ dest_team = cast("ConnectionBody", connection).team_name
+ if dest_team is not None and dest_team !=
conn_id_to_team.get(connection_id):
+ for method in methods:
+ requests.append(
+ {
+ "method": method,
+ "details":
ConnectionDetails(conn_id=connection_id, team_name=dest_team),
+ }
+ )
_requires_access(
- # By calling `batch_is_authorized_connection`, we check the user
has access to all connections provided in the request
is_authorized_callback=lambda:
get_auth_manager().batch_is_authorized_connection(
requests=requests,
user=user,
@@ -686,6 +702,7 @@ def requires_access_variable_bulk() ->
Callable[[BulkBody[VariableBody], BaseUse
request: BulkBody[VariableBody],
user: GetUserDep,
) -> None:
+ multi_team = conf.getboolean("core", "multi_team")
# Build the list of variable keys provided as part of the request that
may correspond to
# an existing resource (UPDATE / DELETE, or CREATE+OVERWRITE which may
turn into a PUT).
existing_variable_keys = [
@@ -706,9 +723,6 @@ def requires_access_variable_bulk() ->
Callable[[BulkBody[VariableBody], BaseUse
if action.action == BulkAction.DELETE
else cast("VariableBody", variable).key
)
- # For each variable, build a `IsAuthorizedVariableRequest`
- # The list of `IsAuthorizedVariableRequest` will then be sent
using `batch_is_authorized_variable`
- # Each `IsAuthorizedVariableRequest` is similar to calling
`is_authorized_variable`
for method in methods:
req: IsAuthorizedVariableRequest = {
"method": method,
@@ -718,9 +732,19 @@ def requires_access_variable_bulk() ->
Callable[[BulkBody[VariableBody], BaseUse
),
}
requests.append(req)
+ # Authorize the destination team_name when the entity body
requests a team change.
+ if multi_team and _bulk_action_sets_team(action):
+ dest_team = cast("VariableBody", variable).team_name
+ if dest_team is not None and dest_team !=
var_key_to_team.get(variable_key):
+ for method in methods:
+ requests.append(
+ {
+ "method": method,
+ "details":
VariableDetails(key=variable_key, team_name=dest_team),
+ }
+ )
_requires_access(
- # By calling `batch_is_authorized_variable`, we check the user has
access to all variables provided in the request
is_authorized_callback=lambda:
get_auth_manager().batch_is_authorized_variable(
requests=requests,
user=user,
@@ -981,3 +1005,10 @@ def _bulk_action_needs_existing_team_lookup(
if action.action != BulkAction.CREATE:
return True
return action.action_on_existence == BulkActionOnExistence.OVERWRITE
+
+
+def _bulk_action_sets_team(
+ action: BulkCreateAction | BulkUpdateAction | BulkDeleteAction,
+) -> bool:
+ """Return True if this action can write a team_name (UPDATE, or CREATE
that carries a body)."""
+ return action.action in (BulkAction.UPDATE, BulkAction.CREATE)
diff --git a/airflow-core/tests/unit/api_fastapi/core_api/test_security.py
b/airflow-core/tests/unit/api_fastapi/core_api/test_security.py
index e916bc19e5d..d7614f82539 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/test_security.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/test_security.py
@@ -1266,6 +1266,105 @@ class TestFastApiSecurity:
user=user,
)
+ @patch.object(Pool, "get_name_to_team_name_mapping")
+ @patch("airflow.api_fastapi.core_api.security.get_auth_manager")
+ @conf_vars({("core", "multi_team"): "True"})
+ def test_requires_access_pool_bulk_checks_destination_team(
+ self, mock_get_auth_manager, mock_get_name_to_team_name_mapping
+ ):
+ """Bulk UPDATE that changes team_name must authorize the destination
team."""
+ auth_manager = Mock()
+ auth_manager.batch_is_authorized_pool.return_value = True
+ mock_get_auth_manager.return_value = auth_manager
+ mock_get_name_to_team_name_mapping.return_value = {"pool1": "team_b"}
+
+ request = BulkBody[PoolBody].model_validate(
+ {
+ "actions": [
+ {
+ "action": "update",
+ "entities": [{"pool": "pool1", "slots": 5,
"team_name": "team_a"}],
+ },
+ ]
+ }
+ )
+ user = Mock()
+ requires_access_pool_bulk()(request, user)
+
+ auth_manager.batch_is_authorized_pool.assert_called_once_with(
+ requests=[
+ {"method": "PUT", "details": PoolDetails(name="pool1",
team_name="team_b")},
+ {"method": "PUT", "details": PoolDetails(name="pool1",
team_name="team_a")},
+ ],
+ user=user,
+ )
+
+ @patch.object(Connection, "get_conn_id_to_team_name_mapping")
+ @patch("airflow.api_fastapi.core_api.security.get_auth_manager")
+ @conf_vars({("core", "multi_team"): "True"})
+ def test_requires_access_connection_bulk_checks_destination_team(
+ self, mock_get_auth_manager, mock_get_conn_id_to_team_name_mapping
+ ):
+ """Bulk UPDATE that changes team_name must authorize the destination
team."""
+ auth_manager = Mock()
+ auth_manager.batch_is_authorized_connection.return_value = True
+ mock_get_auth_manager.return_value = auth_manager
+ mock_get_conn_id_to_team_name_mapping.return_value = {"conn1":
"team_b"}
+
+ request = BulkBody[ConnectionBody].model_validate(
+ {
+ "actions": [
+ {
+ "action": "update",
+ "entities": [{"connection_id": "conn1", "conn_type":
"http", "team_name": "team_a"}],
+ },
+ ]
+ }
+ )
+ user = Mock()
+ requires_access_connection_bulk()(request, user)
+
+ auth_manager.batch_is_authorized_connection.assert_called_once_with(
+ requests=[
+ {"method": "PUT", "details":
ConnectionDetails(conn_id="conn1", team_name="team_b")},
+ {"method": "PUT", "details":
ConnectionDetails(conn_id="conn1", team_name="team_a")},
+ ],
+ user=user,
+ )
+
+ @patch.object(Variable, "get_key_to_team_name_mapping")
+ @patch("airflow.api_fastapi.core_api.security.get_auth_manager")
+ @conf_vars({("core", "multi_team"): "True"})
+ def test_requires_access_variable_bulk_checks_destination_team(
+ self, mock_get_auth_manager, mock_get_key_to_team_name_mapping
+ ):
+ """Bulk UPDATE that changes team_name must authorize the destination
team."""
+ auth_manager = Mock()
+ auth_manager.batch_is_authorized_variable.return_value = True
+ mock_get_auth_manager.return_value = auth_manager
+ mock_get_key_to_team_name_mapping.return_value = {"var1": "team_b"}
+
+ request = BulkBody[VariableBody].model_validate(
+ {
+ "actions": [
+ {
+ "action": "update",
+ "entities": [{"key": "var1", "value": "val",
"team_name": "team_a"}],
+ },
+ ]
+ }
+ )
+ user = Mock()
+ requires_access_variable_bulk()(request, user)
+
+ auth_manager.batch_is_authorized_variable.assert_called_once_with(
+ requests=[
+ {"method": "PUT", "details": VariableDetails(key="var1",
team_name="team_b")},
+ {"method": "PUT", "details": VariableDetails(key="var1",
team_name="team_a")},
+ ],
+ user=user,
+ )
+
class TestAuthManagerDependency:
"""Test the auth_manager_from_app dependency function."""