o-nikolas commented on code in PR #55306:
URL: https://github.com/apache/airflow/pull/55306#discussion_r2326108782


##########
airflow-core/src/airflow/api_fastapi/core_api/security.py:
##########
@@ -255,21 +255,28 @@ def inner(
         request: BulkBody[PoolBody],
         user: GetUserDep,
     ) -> None:
+        existing_pool_names = [
+            cast("str", entity) if action.action == BulkAction.DELETE else 
cast("PoolBody", entity).pool
+            for action in request.actions
+            for entity in action.entities
+            if action.action != BulkAction.CREATE
+        ]
+        teams = Pool.get_bulk_team_name(existing_pool_names)
+
         requests: list[IsAuthorizedPoolRequest] = []
         for action in request.actions:
-            requests.extend(
-                [
-                    {
-                        "method": 
MAP_BULK_ACTION_TO_AUTH_METHOD[action.action],
-                        "details": PoolDetails(
-                            name=cast("str", pool)
-                            if action.action == BulkAction.DELETE
-                            else cast("PoolBody", pool).pool
-                        ),
-                    }
-                    for pool in action.entities
-                ]
-            )
+            for pool in action.entities:
+                pool_name = (
+                    cast("str", pool) if action.action == BulkAction.DELETE 
else cast("PoolBody", pool).pool
+                )
+                req: IsAuthorizedPoolRequest = {
+                    "method": MAP_BULK_ACTION_TO_AUTH_METHOD[action.action],
+                    "details": PoolDetails(
+                        name=pool_name,
+                        team_name=teams.get(pool_name),
+                    ),
+                }
+                requests.append(req)

Review Comment:
   I know this logic is probably quite mechanical and obvious if you're used to 
this area of the codebase. But I find it a bit hard and obscure to review/read. 
I wonder if it's worth adding more comments to this logic going forward?



##########
airflow-core/src/airflow/api_fastapi/core_api/routes/public/import_error.py:
##########
@@ -179,19 +179,21 @@ def get_import_errors(
         limit=limit,
         session=session,
     )
-    import_errors_result: Iterable[tuple[ParseImportError, Iterable[str]]] = 
groupby(
+    import_errors_result: Iterable[tuple[ParseImportError, Iterable]] = 
groupby(
         session.execute(import_errors_select), itemgetter(0)
     )
 
     import_errors = []
     for import_error, file_dag_ids in import_errors_result:
+        dag_ids = [dag_id for _, dag_id in file_dag_ids]
+        teams = DagModel.get_bulk_team_name(dag_ids, session=session)

Review Comment:
   This is really like a `dag_id_to_team` mapping right? I think a better name 
(I know, naming is hard 🙁) would make this and the other instances below more 
readable.
   
   Also, every dag from the same dag bundle that is owned by a team will be 
accessible from that team. Do we need to worry about it at the dag file level? 
Or is that hard to change at this point?



##########
airflow-core/src/airflow/models/connection.py:
##########
@@ -598,3 +598,13 @@ def get_team_name(connection_id: str, session=NEW_SESSION) 
-> str | None:
             .where(Connection.conn_id == connection_id)
         )
         return session.scalar(stmt)
+
+    @staticmethod
+    @provide_session
+    def get_bulk_team_name(connection_ids: list[str], session=NEW_SESSION) -> 
dict[str, str | None]:

Review Comment:
   I think this is a confusing method name. It's really getting a connection to 
team name mapping. It could be more generally useful in other instances, and 
the current name is somewhat hard coded to the current usage. 



##########
airflow-core/tests/unit/api_fastapi/core_api/test_security.py:
##########
@@ -106,7 +106,7 @@ async def test_get_user_expired_token(self, 
mock_get_auth_manager):
 
     @pytest.mark.db_test
     @patch("airflow.api_fastapi.core_api.security.get_auth_manager")
-    async def test_requires_access_dag_authorized(self, mock_get_auth_manager):

Review Comment:
   Why did async get dropped in favour of def in all these cases? Is that 
related to the primary changes in this PR?



##########
airflow-core/src/airflow/api_fastapi/core_api/security.py:
##########
@@ -255,21 +255,28 @@ def inner(
         request: BulkBody[PoolBody],
         user: GetUserDep,
     ) -> None:
+        existing_pool_names = [
+            cast("str", entity) if action.action == BulkAction.DELETE else 
cast("PoolBody", entity).pool
+            for action in request.actions
+            for entity in action.entities
+            if action.action != BulkAction.CREATE
+        ]
+        teams = Pool.get_bulk_team_name(existing_pool_names)
+
         requests: list[IsAuthorizedPoolRequest] = []
         for action in request.actions:
-            requests.extend(
-                [
-                    {
-                        "method": 
MAP_BULK_ACTION_TO_AUTH_METHOD[action.action],
-                        "details": PoolDetails(
-                            name=cast("str", pool)
-                            if action.action == BulkAction.DELETE
-                            else cast("PoolBody", pool).pool
-                        ),
-                    }
-                    for pool in action.entities
-                ]
-            )
+            for pool in action.entities:
+                pool_name = (
+                    cast("str", pool) if action.action == BulkAction.DELETE 
else cast("PoolBody", pool).pool
+                )
+                req: IsAuthorizedPoolRequest = {
+                    "method": MAP_BULK_ACTION_TO_AUTH_METHOD[action.action],
+                    "details": PoolDetails(
+                        name=pool_name,
+                        team_name=teams.get(pool_name),

Review Comment:
   Users should also have access to all connections, pools, dags, etc that are 
_not_ owned by a team (i.e. are global). Is that case still covered by this 
code? I see that the methods that produce the mappings do not include pools 
that are not owned by a team (i.e. `None` team name). So I'm guessing no?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to