vincbeck commented on code in PR #35038: URL: https://github.com/apache/airflow/pull/35038#discussion_r1370347338
########## airflow/auth/managers/fab/security_manager/override.py: ########## @@ -1650,3 +1950,82 @@ def _ldap_calculate_user_roles(self, user_attributes: dict[str, list[bytes]]) -> log.warning("Can't find AUTH_USER_REGISTRATION role: %s", registration_role_name) return list(user_role_objects) + + def _oauth_calculate_user_roles(self, userinfo) -> list[str]: + user_role_objects = set() + + # apply AUTH_ROLES_MAPPING + if self.auth_roles_mapping: + user_role_keys = userinfo.get("role_keys", []) + user_role_objects.update(self.get_roles_from_keys(user_role_keys)) + + # apply AUTH_USER_REGISTRATION_ROLE + if self.auth_user_registration: + registration_role_name = self.auth_user_registration_role + + # if AUTH_USER_REGISTRATION_ROLE_JMESPATH is set, + # use it for the registration role + if self.auth_user_registration_role_jmespath: + import jmespath + + registration_role_name = jmespath.search(self.auth_user_registration_role_jmespath, userinfo) + + # lookup registration role in flask db + fab_role = self.find_role(registration_role_name) + if fab_role: + user_role_objects.add(fab_role) + else: + log.warning("Can't find AUTH_USER_REGISTRATION role: %s", registration_role_name) + + return list(user_role_objects) + + def _get_user_permission_resources( + self, user: User | None, action_name: str, resource_names: list[str] | None = None + ) -> set[str]: + """Get resource names with a certain action name that a user has access to. + + Mainly used to fetch all menu permissions on a single db call, will also + check public permissions and builtin roles + """ + if not resource_names: + resource_names = [] + + db_role_ids = [] + if user is None: + # include public role + roles = [self.get_public_role()] + else: + roles = user.roles + # First check against builtin (statically configured) roles + # because no database query is needed + result = set() + for role in roles: + if role.name in self.builtin_roles: + for resource_name in resource_names: + if self._has_access_builtin_roles(role, action_name, resource_name): + result.add(resource_name) + else: + db_role_ids.append(role.id) + # Then check against database-stored roles + role_resource_names = [ + perm.resource.name for perm in self.filter_roles_by_perm_with_action(action_name, db_role_ids) + ] + result.update(role_resource_names) + return result + + def filter_roles_by_perm_with_action(self, action_name: str, role_ids: list[int]): + """Find roles with permission.""" + return ( + self.appbuilder.get_session.query(self.permission_model) Review Comment: Let's do it in another PR. Some other code might not be compatible with SQLALchemy 2, so let's create another PR for that purpose -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org