vincbeck commented on code in PR #34317:
URL: https://github.com/apache/airflow/pull/34317#discussion_r1357158265


##########
airflow/www/security_manager.py:
##########
@@ -269,125 +272,47 @@ def get_user_roles(user=None):
             user = g.user
         return user.roles
 
-    def get_readable_dags(self, user) -> Iterable[DagModel]:
-        """Gets the DAGs readable by authenticated user."""
-        warnings.warn(
-            "`get_readable_dags` has been deprecated. Please use 
`get_readable_dag_ids` instead.",
-            RemovedInAirflow3Warning,
-            stacklevel=2,
-        )
-        with warnings.catch_warnings():
-            warnings.simplefilter("ignore", RemovedInAirflow3Warning)
-            return self.get_accessible_dags([permissions.ACTION_CAN_READ], 
user)
-
-    def get_editable_dags(self, user) -> Iterable[DagModel]:
-        """Gets the DAGs editable by authenticated user."""
-        warnings.warn(
-            "`get_editable_dags` has been deprecated. Please use 
`get_editable_dag_ids` instead.",
-            RemovedInAirflow3Warning,
-            stacklevel=2,
-        )
-        with warnings.catch_warnings():
-            warnings.simplefilter("ignore", RemovedInAirflow3Warning)
-            return self.get_accessible_dags([permissions.ACTION_CAN_EDIT], 
user)
-
-    @provide_session
-    def get_accessible_dags(
-        self,
-        user_actions: Container[str] | None,
-        user,
-        session: Session = NEW_SESSION,
-    ) -> Iterable[DagModel]:
-        warnings.warn(
-            "`get_accessible_dags` has been deprecated. Please use 
`get_accessible_dag_ids` instead.",
-            RemovedInAirflow3Warning,
-            stacklevel=3,
-        )
-        dag_ids = self.get_accessible_dag_ids(user, user_actions, session)
-        return 
session.scalars(select(DagModel).where(DagModel.dag_id.in_(dag_ids)))
-
-    def get_readable_dag_ids(self, user) -> set[str]:
+    def get_readable_dag_ids(self, user=None) -> set[str]:
         """Gets the DAG IDs readable by authenticated user."""
-        return self.get_accessible_dag_ids(user, [permissions.ACTION_CAN_READ])
+        return self.get_permitted_dag_ids(methods=["GET"], user=user)
 
-    def get_editable_dag_ids(self, user) -> set[str]:
+    def get_editable_dag_ids(self, user=None) -> set[str]:
         """Gets the DAG IDs editable by authenticated user."""
-        return self.get_accessible_dag_ids(user, [permissions.ACTION_CAN_EDIT])
+        return self.get_permitted_dag_ids(methods=["PUT"], user=user)
 
     @provide_session
-    def get_accessible_dag_ids(
+    def get_permitted_dag_ids(
         self,
-        user,
-        user_actions: Container[str] | None = None,
+        *,
+        methods: Container[ResourceMethod] | None = None,
+        user=None,
         session: Session = NEW_SESSION,
     ) -> set[str]:
         """Generic function to get readable or writable DAGs for user."""
-        if not user_actions:
-            user_actions = [permissions.ACTION_CAN_EDIT, 
permissions.ACTION_CAN_READ]
+        if not methods:
+            methods = ["PUT", "GET"]
 
-        if not get_auth_manager().is_logged_in():
-            roles = user.roles
-        else:
-            if (permissions.ACTION_CAN_EDIT in user_actions and 
self.can_edit_all_dags(user)) or (
-                permissions.ACTION_CAN_READ in user_actions and 
self.can_read_all_dags(user)
-            ):
-                return {dag.dag_id for dag in 
session.execute(select(DagModel.dag_id))}
-            user_query = session.scalar(
-                select(User)
-                .options(
-                    joinedload(User.roles)
-                    .subqueryload(Role.permissions)
-                    .options(joinedload(Permission.action), 
joinedload(Permission.resource))
-                )
-                .where(User.id == user.id)
-            )
-            roles = user_query.roles
-
-        resources = set()
-        for role in roles:
-            for permission in role.permissions:
-                action = permission.action.name
-                if action in user_actions:
-                    resource = permission.resource.name
-                    if resource == permissions.RESOURCE_DAG:
-                        return {dag.dag_id for dag in 
session.execute(select(DagModel.dag_id))}
-                    if resource.startswith(permissions.RESOURCE_DAG_PREFIX):
-                        
resources.add(resource[len(permissions.RESOURCE_DAG_PREFIX) :])
-                    else:
-                        resources.add(resource)
-        return {
-            dag.dag_id
-            for dag in 
session.execute(select(DagModel.dag_id).where(DagModel.dag_id.in_(resources)))
-        }
-
-    def can_access_some_dags(self, action: str, dag_id: str | None = None) -> 
bool:
-        """Checks if user has read or write access to some dags."""
-        if dag_id and dag_id != "~":
-            root_dag_id = self._get_root_dag_id(dag_id)
-            return self.has_access(action, 
permissions.resource_name_for_dag(root_dag_id))
+        dag_ids = {dag.dag_id for dag in 
session.execute(select(DagModel.dag_id))}
 
-        user = g.user
-        if action == permissions.ACTION_CAN_READ:
-            return any(self.get_readable_dag_ids(user))
-        return any(self.get_editable_dag_ids(user))
+        if ("GET" in methods and 
get_auth_manager().is_authorized_dag(method="GET", user=user)) or (
+            "PUT" in methods and 
get_auth_manager().is_authorized_dag(method="PUT", user=user)
+        ):
+            return dag_ids
 
-    def can_read_dag(self, dag_id: str, user=None) -> bool:
-        """Determines whether a user has DAG read access."""
-        root_dag_id = self._get_root_dag_id(dag_id)
-        dag_resource_name = permissions.resource_name_for_dag(root_dag_id)
-        return self.has_access(permissions.ACTION_CAN_READ, dag_resource_name, 
user=user)
+        def _is_permitted_dag_id(method: ResourceMethod, methods: 
Container[ResourceMethod], dag_id: str):
+            return method in methods and get_auth_manager().is_authorized_dag(
+                method=method, details=DagDetails(id=dag_id), user=user
+            )
 
-    def can_edit_dag(self, dag_id: str, user=None) -> bool:
-        """Determines whether a user has DAG edit access."""
-        root_dag_id = self._get_root_dag_id(dag_id)
-        dag_resource_name = permissions.resource_name_for_dag(root_dag_id)
-        return self.has_access(permissions.ACTION_CAN_EDIT, dag_resource_name, 
user=user)
+        return {

Review Comment:
   I see. I was also worried about the performance issue. The only concern I 
have with this solution is then we will have to implement such function for 
every resource we want be tenant aware in the future (connections, variables, 
...). All auth managers will have to implement methods like 
`get_permitted_connections`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to