potiuk commented on code in PR #34317:
URL: https://github.com/apache/airflow/pull/34317#discussion_r1355994191


##########
airflow/www/security_manager.py:
##########
@@ -269,125 +272,47 @@ def get_user_roles(user=None):
             user = g.user
         return user.roles
 
-    def get_readable_dags(self, user) -> Iterable[DagModel]:
-        """Gets the DAGs readable by authenticated user."""
-        warnings.warn(
-            "`get_readable_dags` has been deprecated. Please use 
`get_readable_dag_ids` instead.",
-            RemovedInAirflow3Warning,
-            stacklevel=2,
-        )
-        with warnings.catch_warnings():
-            warnings.simplefilter("ignore", RemovedInAirflow3Warning)
-            return self.get_accessible_dags([permissions.ACTION_CAN_READ], 
user)
-
-    def get_editable_dags(self, user) -> Iterable[DagModel]:
-        """Gets the DAGs editable by authenticated user."""
-        warnings.warn(
-            "`get_editable_dags` has been deprecated. Please use 
`get_editable_dag_ids` instead.",
-            RemovedInAirflow3Warning,
-            stacklevel=2,
-        )
-        with warnings.catch_warnings():
-            warnings.simplefilter("ignore", RemovedInAirflow3Warning)
-            return self.get_accessible_dags([permissions.ACTION_CAN_EDIT], 
user)
-
-    @provide_session
-    def get_accessible_dags(
-        self,
-        user_actions: Container[str] | None,
-        user,
-        session: Session = NEW_SESSION,
-    ) -> Iterable[DagModel]:
-        warnings.warn(
-            "`get_accessible_dags` has been deprecated. Please use 
`get_accessible_dag_ids` instead.",
-            RemovedInAirflow3Warning,
-            stacklevel=3,
-        )
-        dag_ids = self.get_accessible_dag_ids(user, user_actions, session)
-        return 
session.scalars(select(DagModel).where(DagModel.dag_id.in_(dag_ids)))
-
-    def get_readable_dag_ids(self, user) -> set[str]:
+    def get_readable_dag_ids(self, user=None) -> set[str]:
         """Gets the DAG IDs readable by authenticated user."""
-        return self.get_accessible_dag_ids(user, [permissions.ACTION_CAN_READ])
+        return self.get_permitted_dag_ids(methods=["GET"], user=user)
 
-    def get_editable_dag_ids(self, user) -> set[str]:
+    def get_editable_dag_ids(self, user=None) -> set[str]:
         """Gets the DAG IDs editable by authenticated user."""
-        return self.get_accessible_dag_ids(user, [permissions.ACTION_CAN_EDIT])
+        return self.get_permitted_dag_ids(methods=["PUT"], user=user)
 
     @provide_session
-    def get_accessible_dag_ids(
+    def get_permitted_dag_ids(
         self,
-        user,
-        user_actions: Container[str] | None = None,
+        *,
+        methods: Container[ResourceMethod] | None = None,
+        user=None,
         session: Session = NEW_SESSION,
     ) -> set[str]:
         """Generic function to get readable or writable DAGs for user."""
-        if not user_actions:
-            user_actions = [permissions.ACTION_CAN_EDIT, 
permissions.ACTION_CAN_READ]
+        if not methods:
+            methods = ["PUT", "GET"]
 
-        if not get_auth_manager().is_logged_in():
-            roles = user.roles
-        else:
-            if (permissions.ACTION_CAN_EDIT in user_actions and 
self.can_edit_all_dags(user)) or (
-                permissions.ACTION_CAN_READ in user_actions and 
self.can_read_all_dags(user)
-            ):
-                return {dag.dag_id for dag in 
session.execute(select(DagModel.dag_id))}
-            user_query = session.scalar(
-                select(User)
-                .options(
-                    joinedload(User.roles)
-                    .subqueryload(Role.permissions)
-                    .options(joinedload(Permission.action), 
joinedload(Permission.resource))
-                )
-                .where(User.id == user.id)
-            )
-            roles = user_query.roles
-
-        resources = set()
-        for role in roles:
-            for permission in role.permissions:
-                action = permission.action.name
-                if action in user_actions:
-                    resource = permission.resource.name
-                    if resource == permissions.RESOURCE_DAG:
-                        return {dag.dag_id for dag in 
session.execute(select(DagModel.dag_id))}
-                    if resource.startswith(permissions.RESOURCE_DAG_PREFIX):
-                        
resources.add(resource[len(permissions.RESOURCE_DAG_PREFIX) :])
-                    else:
-                        resources.add(resource)
-        return {
-            dag.dag_id
-            for dag in 
session.execute(select(DagModel.dag_id).where(DagModel.dag_id.in_(resources)))
-        }
-
-    def can_access_some_dags(self, action: str, dag_id: str | None = None) -> 
bool:
-        """Checks if user has read or write access to some dags."""
-        if dag_id and dag_id != "~":
-            root_dag_id = self._get_root_dag_id(dag_id)
-            return self.has_access(action, 
permissions.resource_name_for_dag(root_dag_id))
+        dag_ids = {dag.dag_id for dag in 
session.execute(select(DagModel.dag_id))}
 
-        user = g.user
-        if action == permissions.ACTION_CAN_READ:
-            return any(self.get_readable_dag_ids(user))
-        return any(self.get_editable_dag_ids(user))
+        if ("GET" in methods and 
get_auth_manager().is_authorized_dag(method="GET", user=user)) or (
+            "PUT" in methods and 
get_auth_manager().is_authorized_dag(method="PUT", user=user)
+        ):
+            return dag_ids
 
-    def can_read_dag(self, dag_id: str, user=None) -> bool:
-        """Determines whether a user has DAG read access."""
-        root_dag_id = self._get_root_dag_id(dag_id)
-        dag_resource_name = permissions.resource_name_for_dag(root_dag_id)
-        return self.has_access(permissions.ACTION_CAN_READ, dag_resource_name, 
user=user)
+        def _is_permitted_dag_id(method: ResourceMethod, methods: 
Container[ResourceMethod], dag_id: str):
+            return method in methods and get_auth_manager().is_authorized_dag(
+                method=method, details=DagDetails(id=dag_id), user=user
+            )
 
-    def can_edit_dag(self, dag_id: str, user=None) -> bool:
-        """Determines whether a user has DAG edit access."""
-        root_dag_id = self._get_root_dag_id(dag_id)
-        dag_resource_name = permissions.resource_name_for_dag(root_dag_id)
-        return self.has_access(permissions.ACTION_CAN_EDIT, dag_resource_name, 
user=user)
+        return {

Review Comment:
   I think if we leave it like this, it might incur a performance hit for FAB 
users who have only access to some dags. In the current implementation reading 
all dags as set is only done when the user has permission to read all dags in 
the role.
   
   Also it hampers any future multi-tenant implementation of auth manager, 
because basically it will not allow to optimize this via auth_manager. 
Basically for all users or all tenants, they will have to read all dag_ids to 
memory always when they want to see DAGs page. This will become a problem very 
quickly for any multi-tenant environment.
   
   I **think** the solution for that should be different. Rather than 
implementing all the loop here, the "get_permitted_dag_ids" should be 
implemented in the `auth_manager`.  
   
   Here we can do some basic checks for all dags, but then getting the list of 
allowed dags should be delegated to auth_manager.
   
   ```python
           if ("GET" in methods and 
get_auth_manager().is_authorized_dag(method="GET", user=user)) or (
               "PUT" in methods and 
get_auth_manager().is_authorized_dag(method="PUT", user=user)
           ):
               return {dag.dag_id for dag in 
session.execute(select(DagModel.dag_id))}
   
          return get_auth_manager().get_permitted_dag_ids(methods=methods, 
user=user)
   ```
   
   Then each auth manager could implement an optimized version of the 
`get_permitted_dag_ids`. For FAB, it will be the original implementation with 
resources, but for others it could be different. Conceptually this for example:
   
   ```python
   tenant = get_tenant(user)
   return {dag.dag_id for dag in 
session.execute(select(DagModel.dag_id).filter_by(tenant=tenant)}
   ```
   
   This is no problem for auth manager to have two ways of checking permissions:
   
   1) when you have dag_id - you can check it individually
   2) when you get the list you have a method that returns the list based on 
same criteria but implemented without using the individual check (bulk).
   
   This is the only way to implement performant version of such permission 
check IMHO.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to