This is an automated email from the ASF dual-hosted git repository. vincbeck pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new c2046245c0 Move methods used only in FAB from security managers to auth manager (#35203) c2046245c0 is described below commit c2046245c07fdd6eb05b996cc67c203c5ac456b6 Author: Vincent <97131062+vincb...@users.noreply.github.com> AuthorDate: Wed Nov 1 15:59:53 2023 -0400 Move methods used only in FAB from security managers to auth manager (#35203) --- airflow/auth/managers/base_auth_manager.py | 50 ++--- .../auth/managers/fab/cli_commands/role_command.py | 2 +- airflow/auth/managers/fab/fab_auth_manager.py | 30 ++- .../managers/fab/security_manager/constants.py | 26 +++ .../auth/managers/fab/security_manager/override.py | 226 ++++++++++++++++++-- airflow/www/app.py | 13 +- airflow/www/extensions/init_appbuilder.py | 14 +- airflow/www/extensions/init_auth_manager.py | 7 +- airflow/www/security_manager.py | 234 +-------------------- .../test_role_and_permission_endpoint.py | 2 +- tests/auth/managers/fab/test_fab_auth_manager.py | 73 ++++--- tests/auth/managers/test_base_auth_manager.py | 88 ++++++-- tests/conftest.py | 4 +- tests/test_utils/api_connexion_utils.py | 2 +- tests/test_utils/decorators.py | 1 - tests/www/test_security.py | 4 +- 16 files changed, 426 insertions(+), 350 deletions(-) diff --git a/airflow/auth/managers/base_auth_manager.py b/airflow/auth/managers/base_auth_manager.py index 2a5b1312a0..7d79fcce3e 100644 --- a/airflow/auth/managers/base_auth_manager.py +++ b/airflow/auth/managers/base_auth_manager.py @@ -18,11 +18,14 @@ from __future__ import annotations from abc import abstractmethod +from functools import cached_property from typing import TYPE_CHECKING, Container, Literal from sqlalchemy import select -from airflow.exceptions import AirflowException +from airflow.auth.managers.models.resource_details import ( + DagDetails, +) from airflow.models import DagModel from airflow.utils.log.logging_mixin import LoggingMixin from airflow.utils.session import NEW_SESSION, provide_session @@ -37,12 +40,12 @@ if TYPE_CHECKING: ConfigurationDetails, ConnectionDetails, DagAccessEntity, - DagDetails, DatasetDetails, PoolDetails, VariableDetails, ) from airflow.cli.cli_config import CLICommand + from airflow.www.extensions.init_appbuilder import AirflowAppBuilder from airflow.www.security_manager import AirflowSecurityManagerV2 ResourceMethod = Literal["GET", "POST", "PUT", "DELETE"] @@ -55,9 +58,10 @@ class BaseAuthManager(LoggingMixin): Auth managers are responsible for any user management related operation such as login, logout, authz, ... """ - def __init__(self, app: Flask) -> None: - self._security_manager: AirflowSecurityManagerV2 | None = None + def __init__(self, app: Flask, appbuilder: AirflowAppBuilder) -> None: + super().__init__() self.app = app + self.appbuilder = appbuilder @staticmethod def get_cli_commands() -> list[CLICommand]: @@ -87,6 +91,13 @@ class BaseAuthManager(LoggingMixin): def get_user_id(self) -> str: """Return the user ID associated to the user in session.""" + def init(self) -> None: + """ + Run operations when Airflow is initializing. + + By default, do nothing. + """ + @abstractmethod def is_logged_in(self) -> bool: """Return whether the user is logged in.""" @@ -267,32 +278,17 @@ class BaseAuthManager(LoggingMixin): def get_url_user_profile(self) -> str | None: """Return the url to a page displaying info about the current user.""" - def get_security_manager_override_class(self) -> type: + @cached_property + def security_manager(self) -> AirflowSecurityManagerV2: """ - Return the security manager override class. + Return the security manager. - The security manager override class is responsible for overriding the default security manager - class airflow.www.security_manager.AirflowSecurityManagerV2 with a custom implementation. - This class is essentially inherited from airflow.www.security_manager.AirflowSecurityManagerV2. + By default, Airflow comes with the default security manager + airflow.www.security_manager.AirflowSecurityManagerV2. The auth manager might need to extend this + default security manager for its own purposes. - By default, return the generic AirflowSecurityManagerV2. + By default, return the default AirflowSecurityManagerV2. """ from airflow.www.security_manager import AirflowSecurityManagerV2 - return AirflowSecurityManagerV2 - - @property - def security_manager(self) -> AirflowSecurityManagerV2: - """Get the security manager.""" - if not self._security_manager: - raise AirflowException("Security manager not defined.") - return self._security_manager - - @security_manager.setter - def security_manager(self, security_manager: AirflowSecurityManagerV2): - """ - Set the security manager. - - :param security_manager: the security manager - """ - self._security_manager = security_manager + return AirflowSecurityManagerV2(self.appbuilder) diff --git a/airflow/auth/managers/fab/cli_commands/role_command.py b/airflow/auth/managers/fab/cli_commands/role_command.py index 59ced49852..5738a2e413 100644 --- a/airflow/auth/managers/fab/cli_commands/role_command.py +++ b/airflow/auth/managers/fab/cli_commands/role_command.py @@ -25,11 +25,11 @@ from collections import defaultdict from typing import TYPE_CHECKING from airflow.auth.managers.fab.cli_commands.utils import get_application_builder +from airflow.auth.managers.fab.security_manager.constants import EXISTING_ROLES from airflow.cli.simple_table import AirflowConsole from airflow.utils import cli as cli_utils from airflow.utils.cli import suppress_logs_and_warning from airflow.utils.providers_configuration_loader import providers_configuration_loaded -from airflow.www.security_manager import EXISTING_ROLES if TYPE_CHECKING: from airflow.auth.managers.fab.models import Action, Permission, Resource, Role diff --git a/airflow/auth/managers/fab/fab_auth_manager.py b/airflow/auth/managers/fab/fab_auth_manager.py index 1b98c33532..1b65e3b76d 100644 --- a/airflow/auth/managers/fab/fab_auth_manager.py +++ b/airflow/auth/managers/fab/fab_auth_manager.py @@ -18,6 +18,7 @@ from __future__ import annotations import warnings +from functools import cached_property from pathlib import Path from typing import TYPE_CHECKING, Container @@ -82,6 +83,7 @@ from airflow.utils.yaml import safe_load from airflow.www.extensions.init_views import _CustomErrorRequestBodyValidator, _LazyResolver if TYPE_CHECKING: + from airflow.auth.managers.fab.security_manager.override import FabAirflowSecurityManagerOverride from airflow.auth.managers.models.base_user import BaseUser from airflow.cli.cli_config import ( CLICommand, @@ -180,6 +182,10 @@ class FabAuthManager(BaseAuthManager): """Return the user ID associated to the user in session.""" return str(self.get_user().get_id()) + def init(self) -> None: + """Run operations when Airflow is initializing.""" + self._sync_appbuilder_roles() + def is_logged_in(self) -> bool: """Return whether the user is logged in.""" return not self.get_user().is_anonymous @@ -328,8 +334,9 @@ class FabAuthManager(BaseAuthManager): for dag in session.execute(select(DagModel.dag_id).where(DagModel.dag_id.in_(resources))) } - def get_security_manager_override_class(self) -> type: - """Return the security manager override.""" + @cached_property + def security_manager(self) -> FabAirflowSecurityManagerOverride: + """Return the security manager specific to FAB.""" from airflow.auth.managers.fab.security_manager.override import FabAirflowSecurityManagerOverride from airflow.www.security import AirflowSecurityManager @@ -346,9 +353,9 @@ class FabAuthManager(BaseAuthManager): "FabAirflowSecurityManagerOverride instead of AirflowSecurityManager.", DeprecationWarning, ) - return sm_from_config + return sm_from_config(self.appbuilder) - return FabAirflowSecurityManagerOverride # default choice + return FabAirflowSecurityManagerOverride(self.appbuilder) def get_url_login(self, **kwargs) -> str: """Return the login page url.""" @@ -489,7 +496,20 @@ class FabAuthManager(BaseAuthManager): :meta private: """ if "." in dag_id: - return self.security_manager.appbuilder.get_session.scalar( + return self.appbuilder.get_session.scalar( select(DagModel.dag_id, DagModel.root_dag_id).where(DagModel.dag_id == dag_id).limit(1) ) return dag_id + + def _sync_appbuilder_roles(self): + """ + Sync appbuilder roles to DB. + + :meta private: + """ + # Garbage collect old permissions/views after they have been modified. + # Otherwise, when the name of a view or menu is changed, the framework + # will add the new Views and Menus names to the backend, but will not + # delete the old ones. + if conf.getboolean("webserver", "UPDATE_FAB_PERMS"): + self.security_manager.sync_roles() diff --git a/airflow/auth/managers/fab/security_manager/constants.py b/airflow/auth/managers/fab/security_manager/constants.py new file mode 100644 index 0000000000..fd600ec267 --- /dev/null +++ b/airflow/auth/managers/fab/security_manager/constants.py @@ -0,0 +1,26 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +EXISTING_ROLES = { + "Admin", + "Viewer", + "User", + "Op", + "Public", +} diff --git a/airflow/auth/managers/fab/security_manager/override.py b/airflow/auth/managers/fab/security_manager/override.py index 9f994f5c50..9bc34912f2 100644 --- a/airflow/auth/managers/fab/security_manager/override.py +++ b/airflow/auth/managers/fab/security_manager/override.py @@ -18,6 +18,7 @@ from __future__ import annotations import datetime +import itertools import logging import os import random @@ -49,8 +50,9 @@ from flask_jwt_extended import JWTManager, current_user as current_user_jwt from flask_login import LoginManager from itsdangerous import want_bytes from markupsafe import Markup -from sqlalchemy import and_, func, inspect, select +from sqlalchemy import and_, func, inspect, or_, select from sqlalchemy.exc import MultipleResultsFound +from sqlalchemy.orm import Session, joinedload from werkzeug.security import check_password_hash, generate_password_hash from airflow.auth.managers.fab.fab_auth_manager import MAP_METHOD_NAME_TO_FAB_ACTION_NAME @@ -63,9 +65,10 @@ from airflow.auth.managers.fab.models import ( assoc_permission_role, ) from airflow.auth.managers.fab.models.anonymous_user import AnonymousUser +from airflow.auth.managers.fab.security_manager.constants import EXISTING_ROLES from airflow.configuration import conf from airflow.exceptions import AirflowException, RemovedInAirflow3Warning -from airflow.models import DagModel +from airflow.models import DagBag, DagModel from airflow.security import permissions from airflow.utils.session import NEW_SESSION, provide_session from airflow.www.extensions.init_auth_manager import get_auth_manager @@ -73,8 +76,6 @@ from airflow.www.security_manager import AirflowSecurityManagerV2 from airflow.www.session import AirflowDatabaseSessionInterface if TYPE_CHECKING: - from sqlalchemy.orm import Session - from airflow.auth.managers.base_auth_manager import ResourceMethod from airflow.auth.managers.fab.models import User from airflow.www.fab_security.manager import BaseSecurityManager @@ -126,6 +127,9 @@ class FabAirflowSecurityManagerOverride(AirflowSecurityManagerV2): oauth_allow_list: dict[str, list] = {} """ Initialized (remote_app) providers dict {'provider_name', OBJ } """ + # global resource for dag-level access + DAG_RESOURCES = {permissions.RESOURCE_DAG} + def __init__(self, appbuilder): # done in super, but we need it before we can call super. self.appbuilder = appbuilder @@ -760,6 +764,160 @@ class FabAirflowSecurityManagerOverride(AirflowSecurityManagerV2): return any(self.get_readable_dag_ids(user)) return any(self.get_editable_dag_ids(user)) + def get_all_permissions(self) -> set[tuple[str, str]]: + """Return all permissions as a set of tuples with the action and resource names.""" + return set( + self.appbuilder.get_session.execute( + select(self.action_model.name, self.resource_model.name) + .join(self.permission_model.action) + .join(self.permission_model.resource) + ) + ) + + def create_dag_specific_permissions(self) -> None: + """ + Add permissions to all DAGs. + + Creates 'can_read', 'can_edit', and 'can_delete' permissions for all + DAGs, along with any `access_control` permissions provided in them. + + This does iterate through ALL the DAGs, which can be slow. See `sync_perm_for_dag` + if you only need to sync a single DAG. + """ + perms = self.get_all_permissions() + dagbag = DagBag(read_dags_from_db=True) + dagbag.collect_dags_from_db() + dags = dagbag.dags.values() + + for dag in dags: + root_dag_id = dag.parent_dag.dag_id if dag.parent_dag else dag.dag_id + dag_resource_name = permissions.resource_name_for_dag(root_dag_id) + for action_name in self.DAG_ACTIONS: + if (action_name, dag_resource_name) not in perms: + self._merge_perm(action_name, dag_resource_name) + + if dag.access_control is not None: + self.sync_perm_for_dag(dag_resource_name, dag.access_control) + + def sync_roles(self) -> None: + """ + Initialize default and custom roles with related permissions. + + 1. Init the default role(Admin, Viewer, User, Op, public) + with related permissions. + 2. Init the custom role(dag-user) with related permissions. + """ + # Create global all-dag permissions + self.create_perm_vm_for_all_dag() + + # Sync the default roles (Admin, Viewer, User, Op, public) with related permissions + self.bulk_sync_roles(self.ROLE_CONFIGS) + + self.add_homepage_access_to_custom_roles() + # init existing roles, the rest role could be created through UI. + self.update_admin_permission() + self.clean_perms() + + def create_perm_vm_for_all_dag(self) -> None: + """Create perm-vm if not exist and insert into FAB security model for all-dags.""" + # create perm for global logical dag + for resource_name, action_name in itertools.product(self.DAG_RESOURCES, self.DAG_ACTIONS): + self._merge_perm(action_name, resource_name) + + def add_homepage_access_to_custom_roles(self) -> None: + """Add Website.can_read access to all custom roles.""" + website_permission = self.create_permission(permissions.ACTION_CAN_READ, permissions.RESOURCE_WEBSITE) + custom_roles = [role for role in self.get_all_roles() if role.name not in EXISTING_ROLES] + for role in custom_roles: + self.add_permission_to_role(role, website_permission) + + self.appbuilder.get_session.commit() + + def update_admin_permission(self) -> None: + """ + Add missing permissions to the table for admin. + + Admin should get all the permissions, except the dag permissions + because Admin already has Dags permission. + Add the missing ones to the table for admin. + """ + session = self.appbuilder.get_session + dag_resources = session.scalars( + select(Resource).where(Resource.name.like(f"{permissions.RESOURCE_DAG_PREFIX}%")) + ) + resource_ids = [resource.id for resource in dag_resources] + + perms = session.scalars(select(Permission).where(~Permission.resource_id.in_(resource_ids))) + perms = [p for p in perms if p.action and p.resource] + + admin = self.find_role("Admin") + admin.permissions = list(set(admin.permissions) | set(perms)) + + session.commit() + + def clean_perms(self) -> None: + """FAB leaves faulty permissions that need to be cleaned up.""" + self.log.debug("Cleaning faulty perms") + sesh = self.appbuilder.get_session + perms = sesh.query(Permission).filter( + or_( + Permission.action == None, # noqa + Permission.resource == None, # noqa + ) + ) + # Since FAB doesn't define ON DELETE CASCADE on these tables, we need + # to delete the _object_ so that SQLA knows to delete the many-to-many + # relationship object too. :( + + deleted_count = 0 + for perm in perms: + sesh.delete(perm) + deleted_count += 1 + sesh.commit() + if deleted_count: + self.log.info("Deleted %s faulty permissions", deleted_count) + + def init_role(self, role_name, perms) -> None: + """ + Initialize the role with actions and related resources. + + :param role_name: + :param perms: + """ + warnings.warn( + "`init_role` has been deprecated. Please use `bulk_sync_roles` instead.", + RemovedInAirflow3Warning, + stacklevel=2, + ) + self.bulk_sync_roles([{"role": role_name, "perms": perms}]) + + def bulk_sync_roles(self, roles: Iterable[dict[str, Any]]) -> None: + """Sync the provided roles and permissions.""" + existing_roles = self._get_all_roles_with_permissions() + non_dag_perms = self._get_all_non_dag_permissions() + + for config in roles: + role_name = config["role"] + perms = config["perms"] + role = existing_roles.get(role_name) or self.add_role(role_name) + + for action_name, resource_name in perms: + perm = non_dag_perms.get((action_name, resource_name)) or self.create_permission( + action_name, resource_name + ) + + if perm not in role.permissions: + self.add_permission_to_role(role, perm) + + def sync_resource_permissions(self, perms: Iterable[tuple[str, str]] | None = None) -> None: + """Populate resource-based permissions.""" + if not perms: + return + + for action_name, resource_name in perms: + self.create_resource(resource_name) + self.create_permission(action_name, resource_name) + """ ----------- Role entity @@ -833,7 +991,6 @@ class FabAirflowSecurityManagerOverride(AirflowSecurityManagerV2): - we use AUTH_ROLES_MAPPING to map from keys, to FAB role names :param role_keys: the list of FAB role keys - :return: a list of Role """ _roles = set() _role_keys = set(role_keys) @@ -1021,7 +1178,6 @@ class FabAirflowSecurityManagerOverride(AirflowSecurityManagerV2): Gets an existing action record. :param name: name - :return: Action record, if it exists """ return self.get_session.query(self.action_model).filter_by(name=name).one_or_none() @@ -1091,7 +1247,6 @@ class FabAirflowSecurityManagerOverride(AirflowSecurityManagerV2): Create a resource with the given name. :param name: The name of the resource to create created. - :return: The FAB resource created. """ resource = self.get_resource(name) if resource is None: @@ -1107,11 +1262,7 @@ class FabAirflowSecurityManagerOverride(AirflowSecurityManagerV2): return resource def get_all_resources(self) -> list[Resource]: - """ - Gets all existing resource records. - - :return: List of all resources - """ + """Gets all existing resource records.""" return self.get_session.query(self.resource_model).all() def delete_resource(self, name: str) -> bool: @@ -1158,7 +1309,6 @@ class FabAirflowSecurityManagerOverride(AirflowSecurityManagerV2): :param action_name: Name of action :param resource_name: Name of resource - :return: The existing permission """ action = self.get_action(action_name) resource = self.get_resource(resource_name) @@ -1175,7 +1325,6 @@ class FabAirflowSecurityManagerOverride(AirflowSecurityManagerV2): Retrieve permission pairs associated with a specific resource object. :param resource: Object representing a single resource. - :return: Action objects representing resource->action pair """ return self.get_session.query(self.permission_model).filter_by(resource_id=resource.id).all() @@ -1215,7 +1364,6 @@ class FabAirflowSecurityManagerOverride(AirflowSecurityManagerV2): :param action_name: Name of existing action :param resource_name: Name of existing resource - :return: None """ if not (action_name and resource_name): return @@ -1246,7 +1394,6 @@ class FabAirflowSecurityManagerOverride(AirflowSecurityManagerV2): :param role: The role about to get a new permission. :param permission: The permission pair to add to a role. - :return: None """ if permission and permission not in role.permissions: try: @@ -2013,6 +2160,53 @@ class FabAirflowSecurityManagerOverride(AirflowSecurityManagerV2): result.update(role_resource_names) return result + def _merge_perm(self, action_name: str, resource_name: str) -> None: + """ + Add the new (action, resource) to assoc_permission_role if it doesn't exist. + + It will add the related entry to ab_permission and ab_resource two meta tables as well. + + :param action_name: Name of the action + :param resource_name: Name of the resource + """ + action = self.get_action(action_name) + resource = self.get_resource(resource_name) + perm = None + if action and resource: + perm = self.appbuilder.get_session.scalar( + select(self.permission_model).filter_by(action=action, resource=resource).limit(1) + ) + if not perm and action_name and resource_name: + self.create_permission(action_name, resource_name) + + def _get_all_roles_with_permissions(self) -> dict[str, Role]: + """Return a dict with a key of role name and value of role with early loaded permissions.""" + return { + r.name: r + for r in self.appbuilder.get_session.scalars( + select(self.role_model).options(joinedload(self.role_model.permissions)) + ).unique() + } + + def _get_all_non_dag_permissions(self) -> dict[tuple[str, str], Permission]: + """ + Get permissions except those that are for specific DAGs. + + Returns a dict with a key of (action_name, resource_name) and value of permission + with all permissions except those that are for specific DAGs. + """ + return { + (action_name, resource_name): viewmodel + for action_name, resource_name, viewmodel in ( + self.appbuilder.get_session.execute( + select(self.action_model.name, self.resource_model.name, self.permission_model) + .join(self.permission_model.action) + .join(self.permission_model.resource) + .where(~self.resource_model.name.like(f"{permissions.RESOURCE_DAG_PREFIX}%")) + ) + ) + } + def filter_roles_by_perm_with_action(self, action_name: str, role_ids: list[int]): """Find roles with permission.""" return ( diff --git a/airflow/www/app.py b/airflow/www/app.py index ac6b87e79d..b8be40a421 100644 --- a/airflow/www/app.py +++ b/airflow/www/app.py @@ -36,6 +36,7 @@ from airflow.settings import _ENABLE_AIP_44 from airflow.utils.json import AirflowJsonProvider from airflow.www.extensions.init_appbuilder import init_appbuilder from airflow.www.extensions.init_appbuilder_links import init_appbuilder_links +from airflow.www.extensions.init_auth_manager import get_auth_manager from airflow.www.extensions.init_cache import init_cache from airflow.www.extensions.init_dagbag import init_dagbag from airflow.www.extensions.init_jinja_globals import init_jinja_globals @@ -67,16 +68,6 @@ app: Flask | None = None csrf = CSRFProtect() -def sync_appbuilder_roles(flask_app): - """Sync appbuilder roles to DB.""" - # Garbage collect old permissions/views after they have been modified. - # Otherwise, when the name of a view or menu is changed, the framework - # will add the new Views and Menus names to the backend, but will not - # delete the old ones. - if conf.getboolean("webserver", "UPDATE_FAB_PERMS"): - flask_app.appbuilder.sm.sync_roles() - - def create_app(config=None, testing=False): """Create a new instance of Airflow WWW app.""" flask_app = Flask(__name__) @@ -174,7 +165,7 @@ def create_app(config=None, testing=False): init_api_auth_provider(flask_app) init_api_error_handlers(flask_app) # needs to be after all api inits to let them add their path first - sync_appbuilder_roles(flask_app) + get_auth_manager().init() init_jinja_globals(flask_app) init_xframe_protection(flask_app) diff --git a/airflow/www/extensions/init_appbuilder.py b/airflow/www/extensions/init_appbuilder.py index acac0f2338..4503b81b43 100644 --- a/airflow/www/extensions/init_appbuilder.py +++ b/airflow/www/extensions/init_appbuilder.py @@ -96,12 +96,11 @@ class AirflowAppBuilder: app = Flask(__name__) app.config.from_object('config') dbmongo = MongoEngine(app) - appbuilder = AppBuilder(app, security_manager_class=SecurityManager) + appbuilder = AppBuilder(app) You can also create everything as an application factory. """ baseviews: list[BaseView | Session] = [] - security_manager_class = None # Flask app app = None # Database Session @@ -132,7 +131,6 @@ class AirflowAppBuilder: base_template="airflow/main.html", static_folder="static/appbuilder", static_url_path="/appbuilder", - security_manager_class=None, update_perms=conf.getboolean("webserver", "UPDATE_FAB_PERMS"), auth_rate_limited=conf.getboolean("webserver", "AUTH_RATE_LIMITED", fallback=True), auth_rate_limit=conf.get("webserver", "AUTH_RATE_LIMIT", fallback="5 per 40 second"), @@ -152,8 +150,6 @@ class AirflowAppBuilder: optional, your override for the global static folder :param static_url_path: optional, your override for the global static url path - :param security_manager_class: - optional, pass your own security manager class :param update_perms: optional, update permissions flag (Boolean) you can use FAB_UPDATE_PERMS config key also @@ -167,7 +163,6 @@ class AirflowAppBuilder: self.addon_managers = {} self.menu = menu self.base_template = base_template - self.security_manager_class = security_manager_class self.indexview = indexview self.static_folder = static_folder self.static_url_path = static_url_path @@ -219,9 +214,8 @@ class AirflowAppBuilder: self._addon_managers = app.config["ADDON_MANAGERS"] self.session = session - self.sm = self.security_manager_class(self) - auth_manager = get_auth_manager() - auth_manager.security_manager = self.sm + auth_manager = init_auth_manager(app, self) + self.sm = auth_manager.security_manager self.bm = BabelManager(self) self._add_global_static() self._add_global_filters() @@ -659,11 +653,9 @@ class AirflowAppBuilder: def init_appbuilder(app: Flask) -> AirflowAppBuilder: """Init `Flask App Builder <https://flask-appbuilder.readthedocs.io/en/latest/>`__.""" - auth_manager = init_auth_manager(app) return AirflowAppBuilder( app=app, session=settings.Session, - security_manager_class=auth_manager.get_security_manager_override_class(), base_template="airflow/main.html", update_perms=conf.getboolean("webserver", "UPDATE_FAB_PERMS"), auth_rate_limited=conf.getboolean("webserver", "AUTH_RATE_LIMITED", fallback=True), diff --git a/airflow/www/extensions/init_auth_manager.py b/airflow/www/extensions/init_auth_manager.py index b84e45ae7f..63ec043c10 100644 --- a/airflow/www/extensions/init_auth_manager.py +++ b/airflow/www/extensions/init_auth_manager.py @@ -25,6 +25,7 @@ if TYPE_CHECKING: from flask import Flask from airflow.auth.managers.base_auth_manager import BaseAuthManager + from airflow.www.extensions.init_appbuilder import AirflowAppBuilder auth_manager: BaseAuthManager | None = None @@ -46,15 +47,15 @@ def get_auth_manager_cls() -> type[BaseAuthManager]: return auth_manager_cls -def init_auth_manager(app: Flask) -> BaseAuthManager: +def init_auth_manager(app: Flask, appbuilder: AirflowAppBuilder) -> BaseAuthManager: """ - Initialize the auth manager with the given flask app object. + Initialize the auth manager. Import the user manager class and instantiate it. """ global auth_manager auth_manager_cls = get_auth_manager_cls() - auth_manager = auth_manager_cls(app) + auth_manager = auth_manager_cls(app, appbuilder) return auth_manager diff --git a/airflow/www/security_manager.py b/airflow/www/security_manager.py index 63d1be3e8f..1d6bc592a5 100644 --- a/airflow/www/security_manager.py +++ b/airflow/www/security_manager.py @@ -16,15 +16,13 @@ # under the License. from __future__ import annotations -import itertools import warnings -from typing import TYPE_CHECKING, Any, Collection, Iterable, Sequence +from typing import TYPE_CHECKING, Any, Collection, Sequence from flask import g -from sqlalchemy import or_, select -from sqlalchemy.orm import joinedload +from sqlalchemy import select -from airflow.auth.managers.fab.models import Permission, Resource, Role +from airflow.auth.managers.fab.security_manager.constants import EXISTING_ROLES as FAB_EXISTING_ROLES from airflow.auth.managers.fab.views.permissions import ( ActionModelView, PermissionPairModelView, @@ -45,21 +43,17 @@ from airflow.auth.managers.fab.views.user_edit import ( ) from airflow.auth.managers.fab.views.user_stats import CustomUserStatsChartView from airflow.exceptions import AirflowException, RemovedInAirflow3Warning -from airflow.models import DagBag, DagModel +from airflow.models import DagModel from airflow.security import permissions from airflow.utils.log.logging_mixin import LoggingMixin from airflow.www.fab_security.sqla.manager import SecurityManager from airflow.www.utils import CustomSQLAInterface -EXISTING_ROLES = { - "Admin", - "Viewer", - "User", - "Op", - "Public", -} +EXISTING_ROLES = FAB_EXISTING_ROLES if TYPE_CHECKING: + from airflow.auth.managers.fab.models import Permission, Resource + pass @@ -163,7 +157,6 @@ class AirflowSecurityManagerV2(SecurityManager, LoggingMixin): ] # global resource for dag-level access - DAG_RESOURCES = {permissions.RESOURCE_DAG} DAG_ACTIONS = permissions.DAG_ACTIONS ########################################################################### @@ -222,39 +215,6 @@ class AirflowSecurityManagerV2(SecurityManager, LoggingMixin): return dm.root_dag_id or dm.dag_id return dag_id - def init_role(self, role_name, perms) -> None: - """ - Initialize the role with actions and related resources. - - :param role_name: - :param perms: - :return: - """ - warnings.warn( - "`init_role` has been deprecated. Please use `bulk_sync_roles` instead.", - RemovedInAirflow3Warning, - stacklevel=2, - ) - self.bulk_sync_roles([{"role": role_name, "perms": perms}]) - - def bulk_sync_roles(self, roles: Iterable[dict[str, Any]]) -> None: - """Sync the provided roles and permissions.""" - existing_roles = self._get_all_roles_with_permissions() - non_dag_perms = self._get_all_non_dag_permissions() - - for config in roles: - role_name = config["role"] - perms = config["perms"] - role = existing_roles.get(role_name) or self.add_role(role_name) - - for action_name, resource_name in perms: - perm = non_dag_perms.get((action_name, resource_name)) or self.create_permission( - action_name, resource_name - ) - - if perm not in role.permissions: - self.add_permission_to_role(role, perm) - @staticmethod def get_user_roles(user=None): """ @@ -308,150 +268,6 @@ class AirflowSecurityManagerV2(SecurityManager, LoggingMixin): return False - def clean_perms(self) -> None: - """FAB leaves faulty permissions that need to be cleaned up.""" - self.log.debug("Cleaning faulty perms") - sesh = self.appbuilder.get_session - perms = sesh.query(Permission).filter( - or_( - Permission.action == None, # noqa - Permission.resource == None, # noqa - ) - ) - # Since FAB doesn't define ON DELETE CASCADE on these tables, we need - # to delete the _object_ so that SQLA knows to delete the many-to-many - # relationship object too. :( - - deleted_count = 0 - for perm in perms: - sesh.delete(perm) - deleted_count += 1 - sesh.commit() - if deleted_count: - self.log.info("Deleted %s faulty permissions", deleted_count) - - def _merge_perm(self, action_name: str, resource_name: str) -> None: - """ - Add the new (action, resource) to assoc_permission_role if it doesn't exist. - - It will add the related entry to ab_permission and ab_resource two meta tables as well. - - :param action_name: Name of the action - :param resource_name: Name of the resource - :return: - """ - action = self.get_action(action_name) - resource = self.get_resource(resource_name) - perm = None - if action and resource: - perm = self.appbuilder.get_session.scalar( - select(self.permission_model).filter_by(action=action, resource=resource).limit(1) - ) - if not perm and action_name and resource_name: - self.create_permission(action_name, resource_name) - - def add_homepage_access_to_custom_roles(self) -> None: - """ - Add Website.can_read access to all custom roles. - - :return: None. - """ - website_permission = self.create_permission(permissions.ACTION_CAN_READ, permissions.RESOURCE_WEBSITE) - custom_roles = [role for role in self.get_all_roles() if role.name not in EXISTING_ROLES] - for role in custom_roles: - self.add_permission_to_role(role, website_permission) - - self.appbuilder.get_session.commit() - - def get_all_permissions(self) -> set[tuple[str, str]]: - """Return all permissions as a set of tuples with the action and resource names.""" - return set( - self.appbuilder.get_session.execute( - select(self.action_model.name, self.resource_model.name) - .join(self.permission_model.action) - .join(self.permission_model.resource) - ) - ) - - def _get_all_non_dag_permissions(self) -> dict[tuple[str, str], Permission]: - """ - Get permissions except those that are for specific DAGs. - - Returns a dict with a key of (action_name, resource_name) and value of permission - with all permissions except those that are for specific DAGs. - """ - return { - (action_name, resource_name): viewmodel - for action_name, resource_name, viewmodel in ( - self.appbuilder.get_session.execute( - select(self.action_model.name, self.resource_model.name, self.permission_model) - .join(self.permission_model.action) - .join(self.permission_model.resource) - .where(~self.resource_model.name.like(f"{permissions.RESOURCE_DAG_PREFIX}%")) - ) - ) - } - - def _get_all_roles_with_permissions(self) -> dict[str, Role]: - """Return a dict with a key of role name and value of role with early loaded permissions.""" - return { - r.name: r - for r in self.appbuilder.get_session.scalars( - select(self.role_model).options(joinedload(self.role_model.permissions)) - ).unique() - } - - def create_dag_specific_permissions(self) -> None: - """ - Add permissions to all DAGs. - - Creates 'can_read', 'can_edit', and 'can_delete' permissions for all - DAGs, along with any `access_control` permissions provided in them. - - This does iterate through ALL the DAGs, which can be slow. See `sync_perm_for_dag` - if you only need to sync a single DAG. - - :return: None. - """ - perms = self.get_all_permissions() - dagbag = DagBag(read_dags_from_db=True) - dagbag.collect_dags_from_db() - dags = dagbag.dags.values() - - for dag in dags: - root_dag_id = dag.parent_dag.dag_id if dag.parent_dag else dag.dag_id - dag_resource_name = permissions.resource_name_for_dag(root_dag_id) - for action_name in self.DAG_ACTIONS: - if (action_name, dag_resource_name) not in perms: - self._merge_perm(action_name, dag_resource_name) - - if dag.access_control is not None: - self.sync_perm_for_dag(dag_resource_name, dag.access_control) - - def update_admin_permission(self) -> None: - """ - Add missing permissions to the table for admin. - - Admin should get all the permissions, except the dag permissions - because Admin already has Dags permission. - Add the missing ones to the table for admin. - - :return: None. - """ - session = self.appbuilder.get_session - dag_resources = session.scalars( - select(Resource).where(Resource.name.like(f"{permissions.RESOURCE_DAG_PREFIX}%")) - ) - resource_ids = [resource.id for resource in dag_resources] - - perms = session.scalars(select(Permission).where(~Permission.resource_id.in_(resource_ids))) - perms = [p for p in perms if p.action and p.resource] - - admin = self.find_role("Admin") - admin.permissions = list(set(admin.permissions) | set(perms)) - - session.commit() - def create_admin_standalone(self) -> tuple[str | None, str | None]: """Perform the required steps when initializing airflow for standalone mode. @@ -459,36 +275,6 @@ class AirflowSecurityManagerV2(SecurityManager, LoggingMixin): """ return None, None - def sync_roles(self) -> None: - """ - Initialize default and custom roles with related permissions. - - 1. Init the default role(Admin, Viewer, User, Op, public) - with related permissions. - 2. Init the custom role(dag-user) with related permissions. - - :return: None. - """ - # Create global all-dag permissions - self.create_perm_vm_for_all_dag() - - # Sync the default roles (Admin, Viewer, User, Op, public) with related permissions - self.bulk_sync_roles(self.ROLE_CONFIGS) - - self.add_homepage_access_to_custom_roles() - # init existing roles, the rest role could be created through UI. - self.update_admin_permission() - self.clean_perms() - - def sync_resource_permissions(self, perms: Iterable[tuple[str, str]] | None = None) -> None: - """Populate resource-based permissions.""" - if not perms: - return - - for action_name, resource_name in perms: - self.create_resource(resource_name) - self.create_permission(action_name, resource_name) - def sync_perm_for_dag( self, dag_id: str, @@ -577,12 +363,6 @@ class AirflowSecurityManagerV2(SecurityManager, LoggingMixin): if dag_perm: self.add_permission_to_role(role, dag_perm) - def create_perm_vm_for_all_dag(self) -> None: - """Create perm-vm if not exist and insert into FAB security model for all-dags.""" - # create perm for global logical dag - for resource_name, action_name in itertools.product(self.DAG_RESOURCES, self.DAG_ACTIONS): - self._merge_perm(action_name, resource_name) - def check_authorization( self, perms: Sequence[tuple[str, str]] | None = None, diff --git a/tests/auth/managers/fab/api_endpoints/test_role_and_permission_endpoint.py b/tests/auth/managers/fab/api_endpoints/test_role_and_permission_endpoint.py index d55cce591a..80bff699ad 100644 --- a/tests/auth/managers/fab/api_endpoints/test_role_and_permission_endpoint.py +++ b/tests/auth/managers/fab/api_endpoints/test_role_and_permission_endpoint.py @@ -20,8 +20,8 @@ import pytest from airflow.api_connexion.exceptions import EXCEPTIONS_LINK_MAP from airflow.auth.managers.fab.models import Role +from airflow.auth.managers.fab.security_manager.constants import EXISTING_ROLES from airflow.security import permissions -from airflow.www.security_manager import EXISTING_ROLES from tests.test_utils.api_connexion_utils import ( assert_401, create_role, diff --git a/tests/auth/managers/fab/test_fab_auth_manager.py b/tests/auth/managers/fab/test_fab_auth_manager.py index 2e545457dd..d9f58fded1 100644 --- a/tests/auth/managers/fab/test_fab_auth_manager.py +++ b/tests/auth/managers/fab/test_fab_auth_manager.py @@ -21,6 +21,7 @@ from unittest import mock from unittest.mock import Mock import pytest +from flask import Flask from airflow.auth.managers.fab.fab_auth_manager import FabAuthManager from airflow.auth.managers.fab.models import User @@ -43,7 +44,7 @@ from airflow.security.permissions import ( RESOURCE_VARIABLE, RESOURCE_WEBSITE, ) -from airflow.www.security_appless import ApplessAirflowSecurityManager +from airflow.www.extensions.init_appbuilder import init_appbuilder IS_AUTHORIZED_METHODS_SIMPLE = { "is_authorized_configuration": RESOURCE_CONFIG, @@ -56,13 +57,17 @@ IS_AUTHORIZED_METHODS_SIMPLE = { @pytest.fixture def auth_manager(): - app_mock = Mock(name="flask_app") - app_mock.config.get.return_value = None # this is called to get the security manager override (if any) - auth_manager = FabAuthManager(app_mock) - auth_manager.security_manager = ApplessAirflowSecurityManager() - return auth_manager + return FabAuthManager(None, None) +@pytest.fixture +def auth_manager_with_appbuilder(): + flask_app = Flask(__name__) + appbuilder = init_appbuilder(flask_app) + return FabAuthManager(flask_app, appbuilder) + + +@pytest.mark.db_test class TestFabAuthManager: @pytest.mark.parametrize( "id,first_name,last_name,username,email,expected", @@ -346,47 +351,55 @@ class TestFabAuthManager: result = auth_manager.is_authorized_website(user=user) assert result == expected_result - def test_get_security_manager_override_class_return_fab_security_manager_override(self, auth_manager): - assert auth_manager.get_security_manager_override_class() is FabAirflowSecurityManagerOverride + @pytest.mark.db_test + def test_security_manager_return_fab_security_manager_override(self, auth_manager_with_appbuilder): + assert isinstance(auth_manager_with_appbuilder.security_manager, FabAirflowSecurityManagerOverride) - def test_get_url_login_when_auth_view_not_defined(self, auth_manager): + @pytest.mark.db_test + def test_get_url_login_when_auth_view_not_defined(self, auth_manager_with_appbuilder): with pytest.raises(AirflowException, match="`auth_view` not defined in the security manager."): - auth_manager.get_url_login() + auth_manager_with_appbuilder.get_url_login() + @pytest.mark.db_test @mock.patch("airflow.auth.managers.fab.fab_auth_manager.url_for") - def test_get_url_login(self, mock_url_for, auth_manager): - auth_manager.security_manager.auth_view = Mock() - auth_manager.security_manager.auth_view.endpoint = "test_endpoint" - auth_manager.get_url_login() + def test_get_url_login(self, mock_url_for, auth_manager_with_appbuilder): + auth_manager_with_appbuilder.security_manager.auth_view = Mock() + auth_manager_with_appbuilder.security_manager.auth_view.endpoint = "test_endpoint" + auth_manager_with_appbuilder.get_url_login() mock_url_for.assert_called_once_with("test_endpoint.login") + @pytest.mark.db_test @mock.patch("airflow.auth.managers.fab.fab_auth_manager.url_for") - def test_get_url_login_with_next(self, mock_url_for, auth_manager): - auth_manager.security_manager.auth_view = Mock() - auth_manager.security_manager.auth_view.endpoint = "test_endpoint" - auth_manager.get_url_login(next_url="next_url") + def test_get_url_login_with_next(self, mock_url_for, auth_manager_with_appbuilder): + auth_manager_with_appbuilder.security_manager.auth_view = Mock() + auth_manager_with_appbuilder.security_manager.auth_view.endpoint = "test_endpoint" + auth_manager_with_appbuilder.get_url_login(next_url="next_url") mock_url_for.assert_called_once_with("test_endpoint.login", next="next_url") - def test_get_url_logout_when_auth_view_not_defined(self, auth_manager): + @pytest.mark.db_test + def test_get_url_logout_when_auth_view_not_defined(self, auth_manager_with_appbuilder): with pytest.raises(AirflowException, match="`auth_view` not defined in the security manager."): - auth_manager.get_url_logout() + auth_manager_with_appbuilder.get_url_logout() + @pytest.mark.db_test @mock.patch("airflow.auth.managers.fab.fab_auth_manager.url_for") - def test_get_url_logout(self, mock_url_for, auth_manager): - auth_manager.security_manager.auth_view = Mock() - auth_manager.security_manager.auth_view.endpoint = "test_endpoint" - auth_manager.get_url_logout() + def test_get_url_logout(self, mock_url_for, auth_manager_with_appbuilder): + auth_manager_with_appbuilder.security_manager.auth_view = Mock() + auth_manager_with_appbuilder.security_manager.auth_view.endpoint = "test_endpoint" + auth_manager_with_appbuilder.get_url_logout() mock_url_for.assert_called_once_with("test_endpoint.logout") - def test_get_url_user_profile_when_auth_view_not_defined(self, auth_manager): - assert auth_manager.get_url_user_profile() is None + @pytest.mark.db_test + def test_get_url_user_profile_when_auth_view_not_defined(self, auth_manager_with_appbuilder): + assert auth_manager_with_appbuilder.get_url_user_profile() is None + @pytest.mark.db_test @mock.patch("airflow.auth.managers.fab.fab_auth_manager.url_for") - def test_get_url_user_profile(self, mock_url_for, auth_manager): + def test_get_url_user_profile(self, mock_url_for, auth_manager_with_appbuilder): expected_url = "test_url" mock_url_for.return_value = expected_url - auth_manager.security_manager.user_view = Mock() - auth_manager.security_manager.user_view.endpoint = "test_endpoint" - actual_url = auth_manager.get_url_user_profile() + auth_manager_with_appbuilder.security_manager.user_view = Mock() + auth_manager_with_appbuilder.security_manager.user_view.endpoint = "test_endpoint" + actual_url = auth_manager_with_appbuilder.get_url_user_profile() mock_url_for.assert_called_once_with("test_endpoint.userinfo") assert actual_url == expected_url diff --git a/tests/auth/managers/test_base_auth_manager.py b/tests/auth/managers/test_base_auth_manager.py index 416fa75e2a..35e23e631f 100644 --- a/tests/auth/managers/test_base_auth_manager.py +++ b/tests/auth/managers/test_base_auth_manager.py @@ -17,12 +17,13 @@ from __future__ import annotations from typing import TYPE_CHECKING +from unittest.mock import MagicMock, Mock import pytest +from flask import Flask from airflow.auth.managers.base_auth_manager import BaseAuthManager, ResourceMethod -from airflow.exceptions import AirflowException -from airflow.www.security_appless import ApplessAirflowSecurityManager +from airflow.www.extensions.init_appbuilder import init_appbuilder from airflow.www.security_manager import AirflowSecurityManagerV2 if TYPE_CHECKING: @@ -39,6 +40,9 @@ if TYPE_CHECKING: class EmptyAuthManager(BaseAuthManager): + def get_user_display_name(self) -> str: + raise NotImplementedError() + def get_user_name(self) -> str: raise NotImplementedError() @@ -112,18 +116,76 @@ class EmptyAuthManager(BaseAuthManager): @pytest.fixture def auth_manager(): - return EmptyAuthManager(None) + return EmptyAuthManager(None, None) -class TestBaseAuthManager: - def test_get_security_manager_override_class_return_empty_class(self, auth_manager): - assert auth_manager.get_security_manager_override_class() is AirflowSecurityManagerV2 +@pytest.fixture +def auth_manager_with_appbuilder(): + flask_app = Flask(__name__) + appbuilder = init_appbuilder(flask_app) + return EmptyAuthManager(flask_app, appbuilder) - def test_get_security_manager_not_defined(self, auth_manager): - with pytest.raises(AirflowException, match="Security manager not defined."): - _security_manager = auth_manager.security_manager - def test_get_security_manager_defined(self, auth_manager): - auth_manager.security_manager = ApplessAirflowSecurityManager() - _security_manager = auth_manager.security_manager - assert type(_security_manager) is ApplessAirflowSecurityManager +class TestBaseAuthManager: + def test_get_cli_commands_return_empty_list(self, auth_manager): + assert auth_manager.get_cli_commands() == [] + + def test_get_api_endpoints_return_none(self, auth_manager): + assert auth_manager.get_api_endpoints() is None + + @pytest.mark.db_test + def test_security_manager_return_default_security_manager(self, auth_manager_with_appbuilder): + assert isinstance(auth_manager_with_appbuilder.security_manager, AirflowSecurityManagerV2) + + @pytest.mark.parametrize( + "access_all, access_per_dag, dag_ids, expected", + [ + # Access to all dags + ( + True, + {}, + ["dag1", "dag2"], + {"dag1", "dag2"}, + ), + # No access to any dag + ( + False, + {}, + ["dag1", "dag2"], + set(), + ), + # Access to specific dags + ( + False, + {"dag1": True}, + ["dag1", "dag2"], + {"dag1"}, + ), + ], + ) + def test_get_permitted_dag_ids( + self, auth_manager, access_all: bool, access_per_dag: dict, dag_ids: list, expected: set + ): + def side_effect_func( + *, + method: ResourceMethod, + access_entity: DagAccessEntity | None = None, + details: DagDetails | None = None, + user: BaseUser | None = None, + ): + if not details: + return access_all + else: + return access_per_dag.get(details.id, False) + + auth_manager.is_authorized_dag = MagicMock(side_effect=side_effect_func) + user = Mock() + session = Mock() + dags = [] + for dag_id in dag_ids: + mock = Mock() + mock.dag_id = dag_id + dags.append(mock) + session.execute.return_value = dags + result = auth_manager.get_permitted_dag_ids(user=user, session=session) + assert result == expected diff --git a/tests/conftest.py b/tests/conftest.py index 4067d06f0f..8fd97e95a0 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -280,8 +280,8 @@ def initial_db_init(): from airflow.configuration import conf from airflow.utils import db - from airflow.www.app import sync_appbuilder_roles from airflow.www.extensions.init_appbuilder import init_appbuilder + from airflow.www.extensions.init_auth_manager import get_auth_manager db.resetdb() db.bootstrap_dagbag() @@ -289,7 +289,7 @@ def initial_db_init(): flask_app = Flask(__name__) flask_app.config["SQLALCHEMY_DATABASE_URI"] = conf.get("database", "SQL_ALCHEMY_CONN") init_appbuilder(flask_app) - sync_appbuilder_roles(flask_app) + get_auth_manager().init() @pytest.fixture(autouse=True, scope="session") diff --git a/tests/test_utils/api_connexion_utils.py b/tests/test_utils/api_connexion_utils.py index 836ad95d7b..83841acbcc 100644 --- a/tests/test_utils/api_connexion_utils.py +++ b/tests/test_utils/api_connexion_utils.py @@ -19,7 +19,7 @@ from __future__ import annotations from contextlib import contextmanager from airflow.api_connexion.exceptions import EXCEPTIONS_LINK_MAP -from airflow.www.security_manager import EXISTING_ROLES +from airflow.auth.managers.fab.security_manager.constants import EXISTING_ROLES @contextmanager diff --git a/tests/test_utils/decorators.py b/tests/test_utils/decorators.py index f6d4b91d91..5b028c694a 100644 --- a/tests/test_utils/decorators.py +++ b/tests/test_utils/decorators.py @@ -42,7 +42,6 @@ def dont_initialize_flask_app_submodules(_func=None, *, skip_all_except=None): "init_api_experimental", "init_api_auth_provider", "init_api_error_handlers", - "sync_appbuilder_roles", "init_jinja_globals", "init_xframe_protection", "init_airflow_session_interface", diff --git a/tests/www/test_security.py b/tests/www/test_security.py index 6f6e3ef35b..641b394808 100644 --- a/tests/www/test_security.py +++ b/tests/www/test_security.py @@ -892,7 +892,9 @@ def test_create_dag_specific_permissions(session, security_manager, monkeypatch, dagbag_class_mock.return_value = dagbag_mock import airflow.www.security - monkeypatch.setitem(airflow.www.security_manager.__dict__, "DagBag", dagbag_class_mock) + monkeypatch.setitem( + airflow.auth.managers.fab.security_manager.override.__dict__, "DagBag", dagbag_class_mock + ) security_manager._sync_dag_view_permissions = mock.Mock() for dag in sample_dags: