This is an automated email from the ASF dual-hosted git repository.
vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 766be1c2cef Implement `filter_authorized_connections`,
`filter_authorized_pools` and `filter_authorized_variables` in AWS auth manager
(#55687)
766be1c2cef is described below
commit 766be1c2cefc0672228734a24728e6624f1262cf
Author: Vincent <[email protected]>
AuthorDate: Mon Sep 22 09:07:46 2025 -0400
Implement `filter_authorized_connections`, `filter_authorized_pools` and
`filter_authorized_variables` in AWS auth manager (#55687)
---
.../amazon/aws/auth_manager/aws_auth_manager.py | 111 ++++++++++++++++++++-
.../aws/auth_manager/test_aws_auth_manager.py | 74 +++++++++-----
2 files changed, 153 insertions(+), 32 deletions(-)
diff --git
a/providers/amazon/src/airflow/providers/amazon/aws/auth_manager/aws_auth_manager.py
b/providers/amazon/src/airflow/providers/amazon/aws/auth_manager/aws_auth_manager.py
index 913ea04fb92..e9b40bedea0 100644
---
a/providers/amazon/src/airflow/providers/amazon/aws/auth_manager/aws_auth_manager.py
+++
b/providers/amazon/src/airflow/providers/amazon/aws/auth_manager/aws_auth_manager.py
@@ -338,6 +338,37 @@ class AwsAuthManager(BaseAuthManager[AwsAuthManagerUser]):
]
return self.avp_facade.batch_is_authorized(requests=facade_requests,
user=user)
+ def filter_authorized_connections(
+ self,
+ *,
+ conn_ids: set[str],
+ user: AwsAuthManagerUser,
+ method: ResourceMethod = "GET",
+ team_name: str | None = None,
+ ) -> set[str]:
+ requests: dict[str, dict[ResourceMethod, IsAuthorizedRequest]] =
defaultdict(dict)
+ requests_list: list[IsAuthorizedRequest] = []
+ for conn_id in conn_ids:
+ request: IsAuthorizedRequest = {
+ "method": method,
+ "entity_type": AvpEntities.CONNECTION,
+ "entity_id": conn_id,
+ }
+ requests[conn_id][method] = request
+ requests_list.append(request)
+
+ batch_is_authorized_results =
self.avp_facade.get_batch_is_authorized_results(
+ requests=requests_list, user=user
+ )
+
+ return {
+ conn_id
+ for conn_id in conn_ids
+ if self._is_authorized_from_batch_response(
+ batch_is_authorized_results, requests[conn_id][method], user
+ )
+ }
+
def filter_authorized_dag_ids(
self,
*,
@@ -361,13 +392,75 @@ class AwsAuthManager(BaseAuthManager[AwsAuthManagerUser]):
requests=requests_list, user=user
)
- def _has_access_to_dag(request: IsAuthorizedRequest):
- result = self.avp_facade.get_batch_is_authorized_single_result(
- batch_is_authorized_results=batch_is_authorized_results,
request=request, user=user
+ return {
+ dag_id
+ for dag_id in dag_ids
+ if self._is_authorized_from_batch_response(
+ batch_is_authorized_results, requests[dag_id][method], user
)
- return result["decision"] == "ALLOW"
+ }
- return {dag_id for dag_id in dag_ids if
_has_access_to_dag(requests[dag_id][method])}
+ def filter_authorized_pools(
+ self,
+ *,
+ pool_names: set[str],
+ user: AwsAuthManagerUser,
+ method: ResourceMethod = "GET",
+ team_name: str | None = None,
+ ) -> set[str]:
+ requests: dict[str, dict[ResourceMethod, IsAuthorizedRequest]] =
defaultdict(dict)
+ requests_list: list[IsAuthorizedRequest] = []
+ for pool_name in pool_names:
+ request: IsAuthorizedRequest = {
+ "method": method,
+ "entity_type": AvpEntities.POOL,
+ "entity_id": pool_name,
+ }
+ requests[pool_name][method] = request
+ requests_list.append(request)
+
+ batch_is_authorized_results =
self.avp_facade.get_batch_is_authorized_results(
+ requests=requests_list, user=user
+ )
+
+ return {
+ pool_name
+ for pool_name in pool_names
+ if self._is_authorized_from_batch_response(
+ batch_is_authorized_results, requests[pool_name][method], user
+ )
+ }
+
+ def filter_authorized_variables(
+ self,
+ *,
+ variable_keys: set[str],
+ user: AwsAuthManagerUser,
+ method: ResourceMethod = "GET",
+ team_name: str | None = None,
+ ) -> set[str]:
+ requests: dict[str, dict[ResourceMethod, IsAuthorizedRequest]] =
defaultdict(dict)
+ requests_list: list[IsAuthorizedRequest] = []
+ for variable_key in variable_keys:
+ request: IsAuthorizedRequest = {
+ "method": method,
+ "entity_type": AvpEntities.VARIABLE,
+ "entity_id": variable_key,
+ }
+ requests[variable_key][method] = request
+ requests_list.append(request)
+
+ batch_is_authorized_results =
self.avp_facade.get_batch_is_authorized_results(
+ requests=requests_list, user=user
+ )
+
+ return {
+ variable_key
+ for variable_key in variable_keys
+ if self._is_authorized_from_batch_response(
+ batch_is_authorized_results, requests[variable_key][method],
user
+ )
+ }
def get_url_login(self, **kwargs) -> str:
return urljoin(self.apiserver_endpoint,
f"{AUTH_MANAGER_FASTAPI_APP_PREFIX}/login")
@@ -406,6 +499,14 @@ class AwsAuthManager(BaseAuthManager[AwsAuthManagerUser]):
"entity_id": menu_item_text,
}
+ def _is_authorized_from_batch_response(
+ self, batch_is_authorized_results: list[dict], request:
IsAuthorizedRequest, user: AwsAuthManagerUser
+ ):
+ result = self.avp_facade.get_batch_is_authorized_single_result(
+ batch_is_authorized_results=batch_is_authorized_results,
request=request, user=user
+ )
+ return result["decision"] == "ALLOW"
+
def _check_avp_schema_version(self):
if not self.avp_facade.is_policy_store_schema_up_to_date():
self.log.warning(
diff --git
a/providers/amazon/tests/unit/amazon/aws/auth_manager/test_aws_auth_manager.py
b/providers/amazon/tests/unit/amazon/aws/auth_manager/test_aws_auth_manager.py
index 70d5a31986e..39f77fbfb00 100644
---
a/providers/amazon/tests/unit/amazon/aws/auth_manager/test_aws_auth_manager.py
+++
b/providers/amazon/tests/unit/amazon/aws/auth_manager/test_aws_auth_manager.py
@@ -612,81 +612,100 @@ class TestAwsAuthManager:
)
assert result
+ @pytest.mark.parametrize(
+ "get_authorized_method, avp_entity, entities_parameter",
+ [
+ ("filter_authorized_connections", AvpEntities.CONNECTION.value,
"conn_ids"),
+ ("filter_authorized_dag_ids", AvpEntities.DAG.value, "dag_ids"),
+ ("filter_authorized_pools", AvpEntities.POOL.value, "pool_names"),
+ ("filter_authorized_variables", AvpEntities.VARIABLE.value,
"variable_keys"),
+ ],
+ )
@pytest.mark.parametrize(
"method, user, expected_result",
[
- ("GET", AwsAuthManagerUser(user_id="test_user_id1", groups=[]),
{"dag_1"}),
+ ("GET", AwsAuthManagerUser(user_id="test_user_id1", groups=[]),
{"entity_1"}),
("PUT", AwsAuthManagerUser(user_id="test_user_id1", groups=[]),
set()),
("GET", AwsAuthManagerUser(user_id="test_user_id2", groups=[]),
set()),
- ("PUT", AwsAuthManagerUser(user_id="test_user_id2", groups=[]),
{"dag_2"}),
+ ("PUT", AwsAuthManagerUser(user_id="test_user_id2", groups=[]),
{"entity_2"}),
],
)
- def test_filter_authorized_dag_ids(self, method, user, auth_manager,
test_user, expected_result):
- dag_ids = {"dag_1", "dag_2"}
- # test_user_id1 has GET permissions on dag_1
- # test_user_id2 has PUT permissions on dag_2
+ def test_filter_authorized(
+ self,
+ get_authorized_method,
+ avp_entity,
+ entities_parameter,
+ method,
+ user,
+ auth_manager,
+ test_user,
+ expected_result,
+ ):
+ entity_ids = {"entity_1", "entity_2"}
+ # test_user_id1 has GET permissions on entity_1
+ # test_user_id2 has PUT permissions on entity_2
batch_is_authorized_output = [
{
"request": {
"principal": {"entityType": "Airflow::User", "entityId":
"test_user_id1"},
- "action": {"actionType": "Airflow::Action", "actionId":
"Dag.GET"},
- "resource": {"entityType": "Airflow::Dag", "entityId":
"dag_1"},
+ "action": {"actionType": "Airflow::Action", "actionId":
f"{avp_entity}.GET"},
+ "resource": {"entityType": f"Airflow::{avp_entity}",
"entityId": "entity_1"},
},
"decision": "ALLOW",
},
{
"request": {
"principal": {"entityType": "Airflow::User", "entityId":
"test_user_id1"},
- "action": {"actionType": "Airflow::Action", "actionId":
"Dag.PUT"},
- "resource": {"entityType": "Airflow::Dag", "entityId":
"dag_1"},
+ "action": {"actionType": "Airflow::Action", "actionId":
f"{avp_entity}.PUT"},
+ "resource": {"entityType": f"Airflow::{avp_entity}",
"entityId": "entity_1"},
},
"decision": "DENY",
},
{
"request": {
"principal": {"entityType": "Airflow::User", "entityId":
"test_user_id1"},
- "action": {"actionType": "Airflow::Action", "actionId":
"Dag.GET"},
- "resource": {"entityType": "Airflow::Dag", "entityId":
"dag_2"},
+ "action": {"actionType": "Airflow::Action", "actionId":
f"{avp_entity}.GET"},
+ "resource": {"entityType": f"Airflow::{avp_entity}",
"entityId": "entity_2"},
},
"decision": "DENY",
},
{
"request": {
"principal": {"entityType": "Airflow::User", "entityId":
"test_user_id1"},
- "action": {"actionType": "Airflow::Action", "actionId":
"Dag.PUT"},
- "resource": {"entityType": "Airflow::Dag", "entityId":
"dag_2"},
+ "action": {"actionType": "Airflow::Action", "actionId":
f"{avp_entity}.PUT"},
+ "resource": {"entityType": f"Airflow::{avp_entity}",
"entityId": "entity_2"},
},
"decision": "DENY",
},
{
"request": {
"principal": {"entityType": "Airflow::User", "entityId":
"test_user_id2"},
- "action": {"actionType": "Airflow::Action", "actionId":
"Dag.GET"},
- "resource": {"entityType": "Airflow::Dag", "entityId":
"dag_1"},
+ "action": {"actionType": "Airflow::Action", "actionId":
f"{avp_entity}.GET"},
+ "resource": {"entityType": f"Airflow::{avp_entity}",
"entityId": "entity_1"},
},
"decision": "DENY",
},
{
"request": {
"principal": {"entityType": "Airflow::User", "entityId":
"test_user_id2"},
- "action": {"actionType": "Airflow::Action", "actionId":
"Dag.PUT"},
- "resource": {"entityType": "Airflow::Dag", "entityId":
"dag_1"},
+ "action": {"actionType": "Airflow::Action", "actionId":
f"{avp_entity}.PUT"},
+ "resource": {"entityType": f"Airflow::{avp_entity}",
"entityId": "entity_1"},
},
"decision": "DENY",
},
{
"request": {
"principal": {"entityType": "Airflow::User", "entityId":
"test_user_id2"},
- "action": {"actionType": "Airflow::Action", "actionId":
"Dag.GET"},
- "resource": {"entityType": "Airflow::Dag", "entityId":
"dag_2"},
+ "action": {"actionType": "Airflow::Action", "actionId":
f"{avp_entity}.GET"},
+ "resource": {"entityType": f"Airflow::{avp_entity}",
"entityId": "entity_2"},
},
"decision": "DENY",
},
{
"request": {
"principal": {"entityType": "Airflow::User", "entityId":
"test_user_id2"},
- "action": {"actionType": "Airflow::Action", "actionId":
"Dag.PUT"},
- "resource": {"entityType": "Airflow::Dag", "entityId":
"dag_2"},
+ "action": {"actionType": "Airflow::Action", "actionId":
f"{avp_entity}.PUT"},
+ "resource": {"entityType": f"Airflow::{avp_entity}",
"entityId": "entity_2"},
},
"decision": "ALLOW",
},
@@ -695,11 +714,12 @@ class TestAwsAuthManager:
return_value=batch_is_authorized_output
)
- result = auth_manager.filter_authorized_dag_ids(
- dag_ids=dag_ids,
- method=method,
- user=user,
- )
+ params = {
+ entities_parameter: entity_ids,
+ "method": method,
+ "user": user,
+ }
+ result = getattr(auth_manager, get_authorized_method)(**params)
auth_manager.avp_facade.get_batch_is_authorized_results.assert_called()
assert result == expected_result