potiuk commented on code in PR #34317: URL: https://github.com/apache/airflow/pull/34317#discussion_r1355994191
########## airflow/www/security_manager.py: ########## @@ -269,125 +272,47 @@ def get_user_roles(user=None): user = g.user return user.roles - def get_readable_dags(self, user) -> Iterable[DagModel]: - """Gets the DAGs readable by authenticated user.""" - warnings.warn( - "`get_readable_dags` has been deprecated. Please use `get_readable_dag_ids` instead.", - RemovedInAirflow3Warning, - stacklevel=2, - ) - with warnings.catch_warnings(): - warnings.simplefilter("ignore", RemovedInAirflow3Warning) - return self.get_accessible_dags([permissions.ACTION_CAN_READ], user) - - def get_editable_dags(self, user) -> Iterable[DagModel]: - """Gets the DAGs editable by authenticated user.""" - warnings.warn( - "`get_editable_dags` has been deprecated. Please use `get_editable_dag_ids` instead.", - RemovedInAirflow3Warning, - stacklevel=2, - ) - with warnings.catch_warnings(): - warnings.simplefilter("ignore", RemovedInAirflow3Warning) - return self.get_accessible_dags([permissions.ACTION_CAN_EDIT], user) - - @provide_session - def get_accessible_dags( - self, - user_actions: Container[str] | None, - user, - session: Session = NEW_SESSION, - ) -> Iterable[DagModel]: - warnings.warn( - "`get_accessible_dags` has been deprecated. Please use `get_accessible_dag_ids` instead.", - RemovedInAirflow3Warning, - stacklevel=3, - ) - dag_ids = self.get_accessible_dag_ids(user, user_actions, session) - return session.scalars(select(DagModel).where(DagModel.dag_id.in_(dag_ids))) - - def get_readable_dag_ids(self, user) -> set[str]: + def get_readable_dag_ids(self, user=None) -> set[str]: """Gets the DAG IDs readable by authenticated user.""" - return self.get_accessible_dag_ids(user, [permissions.ACTION_CAN_READ]) + return self.get_permitted_dag_ids(methods=["GET"], user=user) - def get_editable_dag_ids(self, user) -> set[str]: + def get_editable_dag_ids(self, user=None) -> set[str]: """Gets the DAG IDs editable by authenticated user.""" - return self.get_accessible_dag_ids(user, [permissions.ACTION_CAN_EDIT]) + return self.get_permitted_dag_ids(methods=["PUT"], user=user) @provide_session - def get_accessible_dag_ids( + def get_permitted_dag_ids( self, - user, - user_actions: Container[str] | None = None, + *, + methods: Container[ResourceMethod] | None = None, + user=None, session: Session = NEW_SESSION, ) -> set[str]: """Generic function to get readable or writable DAGs for user.""" - if not user_actions: - user_actions = [permissions.ACTION_CAN_EDIT, permissions.ACTION_CAN_READ] + if not methods: + methods = ["PUT", "GET"] - if not get_auth_manager().is_logged_in(): - roles = user.roles - else: - if (permissions.ACTION_CAN_EDIT in user_actions and self.can_edit_all_dags(user)) or ( - permissions.ACTION_CAN_READ in user_actions and self.can_read_all_dags(user) - ): - return {dag.dag_id for dag in session.execute(select(DagModel.dag_id))} - user_query = session.scalar( - select(User) - .options( - joinedload(User.roles) - .subqueryload(Role.permissions) - .options(joinedload(Permission.action), joinedload(Permission.resource)) - ) - .where(User.id == user.id) - ) - roles = user_query.roles - - resources = set() - for role in roles: - for permission in role.permissions: - action = permission.action.name - if action in user_actions: - resource = permission.resource.name - if resource == permissions.RESOURCE_DAG: - return {dag.dag_id for dag in session.execute(select(DagModel.dag_id))} - if resource.startswith(permissions.RESOURCE_DAG_PREFIX): - resources.add(resource[len(permissions.RESOURCE_DAG_PREFIX) :]) - else: - resources.add(resource) - return { - dag.dag_id - for dag in session.execute(select(DagModel.dag_id).where(DagModel.dag_id.in_(resources))) - } - - def can_access_some_dags(self, action: str, dag_id: str | None = None) -> bool: - """Checks if user has read or write access to some dags.""" - if dag_id and dag_id != "~": - root_dag_id = self._get_root_dag_id(dag_id) - return self.has_access(action, permissions.resource_name_for_dag(root_dag_id)) + dag_ids = {dag.dag_id for dag in session.execute(select(DagModel.dag_id))} - user = g.user - if action == permissions.ACTION_CAN_READ: - return any(self.get_readable_dag_ids(user)) - return any(self.get_editable_dag_ids(user)) + if ("GET" in methods and get_auth_manager().is_authorized_dag(method="GET", user=user)) or ( + "PUT" in methods and get_auth_manager().is_authorized_dag(method="PUT", user=user) + ): + return dag_ids - def can_read_dag(self, dag_id: str, user=None) -> bool: - """Determines whether a user has DAG read access.""" - root_dag_id = self._get_root_dag_id(dag_id) - dag_resource_name = permissions.resource_name_for_dag(root_dag_id) - return self.has_access(permissions.ACTION_CAN_READ, dag_resource_name, user=user) + def _is_permitted_dag_id(method: ResourceMethod, methods: Container[ResourceMethod], dag_id: str): + return method in methods and get_auth_manager().is_authorized_dag( + method=method, details=DagDetails(id=dag_id), user=user + ) - def can_edit_dag(self, dag_id: str, user=None) -> bool: - """Determines whether a user has DAG edit access.""" - root_dag_id = self._get_root_dag_id(dag_id) - dag_resource_name = permissions.resource_name_for_dag(root_dag_id) - return self.has_access(permissions.ACTION_CAN_EDIT, dag_resource_name, user=user) + return { Review Comment: I think if we leave it like this, it might incur a performance hit for FAB users who have only access to some dags. In the current implementation reading all dags as set is only done when the user has permission to read all dags in the role. Also it hampers any future multi-tenant implementation of auth manager, because basically it will not allow to optimize this via auth_manager. Basically for all users or all tenants, they will have to read all dag_ids to memory always when they want to see DAGs page. This will become a problem very quickly for any multi-tenant environment. I **think** the solution for that should be different. Rather than implementing all the loop here, the "get_permitted_dag_ids" should be implemented in the `auth_manager`. Here we can do some basic checks for all dags, but then getting the list of allowed dags should be delegated to auth_manager. ```python if ("GET" in methods and get_auth_manager().is_authorized_dag(method="GET", user=user)) or ( "PUT" in methods and get_auth_manager().is_authorized_dag(method="PUT", user=user) ): return {dag.dag_id for dag in session.execute(select(DagModel.dag_id))} return get_auth_manager().get_permitted_dag_ids(methods=methods, user=user) ``` Then each auth manager could implement an optimized version of the `get_permitted_dag_ids`. For FAB, it will be the original implementation with resources, but for others it could be different. Conceptually this for example: ```python tenant = get_tenant(user) return {dag.dag_id for dag in session.execute(select(DagModel.dag_id).filter_by(tenant=tenant)} ``` This is no problem for auth manager to have two ways of checking permissions: 1) when you have dag_id - you can check it individually 2) when you get the list you have a method that returns the list based on same criteria but implemented without using the individual check This is the only way to implement performant version of such permission check IMHO. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org