This is an automated email from the ASF dual-hosted git repository.

vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new c2046245c0 Move methods used only in FAB from security managers to 
auth manager (#35203)
c2046245c0 is described below

commit c2046245c07fdd6eb05b996cc67c203c5ac456b6
Author: Vincent <97131062+vincb...@users.noreply.github.com>
AuthorDate: Wed Nov 1 15:59:53 2023 -0400

    Move methods used only in FAB from security managers to auth manager 
(#35203)
---
 airflow/auth/managers/base_auth_manager.py         |  50 ++---
 .../auth/managers/fab/cli_commands/role_command.py |   2 +-
 airflow/auth/managers/fab/fab_auth_manager.py      |  30 ++-
 .../managers/fab/security_manager/constants.py     |  26 +++
 .../auth/managers/fab/security_manager/override.py | 226 ++++++++++++++++++--
 airflow/www/app.py                                 |  13 +-
 airflow/www/extensions/init_appbuilder.py          |  14 +-
 airflow/www/extensions/init_auth_manager.py        |   7 +-
 airflow/www/security_manager.py                    | 234 +--------------------
 .../test_role_and_permission_endpoint.py           |   2 +-
 tests/auth/managers/fab/test_fab_auth_manager.py   |  73 ++++---
 tests/auth/managers/test_base_auth_manager.py      |  88 ++++++--
 tests/conftest.py                                  |   4 +-
 tests/test_utils/api_connexion_utils.py            |   2 +-
 tests/test_utils/decorators.py                     |   1 -
 tests/www/test_security.py                         |   4 +-
 16 files changed, 426 insertions(+), 350 deletions(-)

diff --git a/airflow/auth/managers/base_auth_manager.py 
b/airflow/auth/managers/base_auth_manager.py
index 2a5b1312a0..7d79fcce3e 100644
--- a/airflow/auth/managers/base_auth_manager.py
+++ b/airflow/auth/managers/base_auth_manager.py
@@ -18,11 +18,14 @@
 from __future__ import annotations
 
 from abc import abstractmethod
+from functools import cached_property
 from typing import TYPE_CHECKING, Container, Literal
 
 from sqlalchemy import select
 
-from airflow.exceptions import AirflowException
+from airflow.auth.managers.models.resource_details import (
+    DagDetails,
+)
 from airflow.models import DagModel
 from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.utils.session import NEW_SESSION, provide_session
@@ -37,12 +40,12 @@ if TYPE_CHECKING:
         ConfigurationDetails,
         ConnectionDetails,
         DagAccessEntity,
-        DagDetails,
         DatasetDetails,
         PoolDetails,
         VariableDetails,
     )
     from airflow.cli.cli_config import CLICommand
+    from airflow.www.extensions.init_appbuilder import AirflowAppBuilder
     from airflow.www.security_manager import AirflowSecurityManagerV2
 
 ResourceMethod = Literal["GET", "POST", "PUT", "DELETE"]
@@ -55,9 +58,10 @@ class BaseAuthManager(LoggingMixin):
     Auth managers are responsible for any user management related operation 
such as login, logout, authz, ...
     """
 
-    def __init__(self, app: Flask) -> None:
-        self._security_manager: AirflowSecurityManagerV2 | None = None
+    def __init__(self, app: Flask, appbuilder: AirflowAppBuilder) -> None:
+        super().__init__()
         self.app = app
+        self.appbuilder = appbuilder
 
     @staticmethod
     def get_cli_commands() -> list[CLICommand]:
@@ -87,6 +91,13 @@ class BaseAuthManager(LoggingMixin):
     def get_user_id(self) -> str:
         """Return the user ID associated to the user in session."""
 
+    def init(self) -> None:
+        """
+        Run operations when Airflow is initializing.
+
+        By default, do nothing.
+        """
+
     @abstractmethod
     def is_logged_in(self) -> bool:
         """Return whether the user is logged in."""
@@ -267,32 +278,17 @@ class BaseAuthManager(LoggingMixin):
     def get_url_user_profile(self) -> str | None:
         """Return the url to a page displaying info about the current user."""
 
-    def get_security_manager_override_class(self) -> type:
+    @cached_property
+    def security_manager(self) -> AirflowSecurityManagerV2:
         """
-        Return the security manager override class.
+        Return the security manager.
 
-        The security manager override class is responsible for overriding the 
default security manager
-        class airflow.www.security_manager.AirflowSecurityManagerV2 with a 
custom implementation.
-        This class is essentially inherited from 
airflow.www.security_manager.AirflowSecurityManagerV2.
+        By default, Airflow comes with the default security manager
+        airflow.www.security_manager.AirflowSecurityManagerV2. The auth 
manager might need to extend this
+        default security manager for its own purposes.
 
-        By default, return the generic AirflowSecurityManagerV2.
+        By default, return the default AirflowSecurityManagerV2.
         """
         from airflow.www.security_manager import AirflowSecurityManagerV2
 
-        return AirflowSecurityManagerV2
-
-    @property
-    def security_manager(self) -> AirflowSecurityManagerV2:
-        """Get the security manager."""
-        if not self._security_manager:
-            raise AirflowException("Security manager not defined.")
-        return self._security_manager
-
-    @security_manager.setter
-    def security_manager(self, security_manager: AirflowSecurityManagerV2):
-        """
-        Set the security manager.
-
-        :param security_manager: the security manager
-        """
-        self._security_manager = security_manager
+        return AirflowSecurityManagerV2(self.appbuilder)
diff --git a/airflow/auth/managers/fab/cli_commands/role_command.py 
b/airflow/auth/managers/fab/cli_commands/role_command.py
index 59ced49852..5738a2e413 100644
--- a/airflow/auth/managers/fab/cli_commands/role_command.py
+++ b/airflow/auth/managers/fab/cli_commands/role_command.py
@@ -25,11 +25,11 @@ from collections import defaultdict
 from typing import TYPE_CHECKING
 
 from airflow.auth.managers.fab.cli_commands.utils import 
get_application_builder
+from airflow.auth.managers.fab.security_manager.constants import EXISTING_ROLES
 from airflow.cli.simple_table import AirflowConsole
 from airflow.utils import cli as cli_utils
 from airflow.utils.cli import suppress_logs_and_warning
 from airflow.utils.providers_configuration_loader import 
providers_configuration_loaded
-from airflow.www.security_manager import EXISTING_ROLES
 
 if TYPE_CHECKING:
     from airflow.auth.managers.fab.models import Action, Permission, Resource, 
Role
diff --git a/airflow/auth/managers/fab/fab_auth_manager.py 
b/airflow/auth/managers/fab/fab_auth_manager.py
index 1b98c33532..1b65e3b76d 100644
--- a/airflow/auth/managers/fab/fab_auth_manager.py
+++ b/airflow/auth/managers/fab/fab_auth_manager.py
@@ -18,6 +18,7 @@
 from __future__ import annotations
 
 import warnings
+from functools import cached_property
 from pathlib import Path
 from typing import TYPE_CHECKING, Container
 
@@ -82,6 +83,7 @@ from airflow.utils.yaml import safe_load
 from airflow.www.extensions.init_views import 
_CustomErrorRequestBodyValidator, _LazyResolver
 
 if TYPE_CHECKING:
+    from airflow.auth.managers.fab.security_manager.override import 
FabAirflowSecurityManagerOverride
     from airflow.auth.managers.models.base_user import BaseUser
     from airflow.cli.cli_config import (
         CLICommand,
@@ -180,6 +182,10 @@ class FabAuthManager(BaseAuthManager):
         """Return the user ID associated to the user in session."""
         return str(self.get_user().get_id())
 
+    def init(self) -> None:
+        """Run operations when Airflow is initializing."""
+        self._sync_appbuilder_roles()
+
     def is_logged_in(self) -> bool:
         """Return whether the user is logged in."""
         return not self.get_user().is_anonymous
@@ -328,8 +334,9 @@ class FabAuthManager(BaseAuthManager):
             for dag in 
session.execute(select(DagModel.dag_id).where(DagModel.dag_id.in_(resources)))
         }
 
-    def get_security_manager_override_class(self) -> type:
-        """Return the security manager override."""
+    @cached_property
+    def security_manager(self) -> FabAirflowSecurityManagerOverride:
+        """Return the security manager specific to FAB."""
         from airflow.auth.managers.fab.security_manager.override import 
FabAirflowSecurityManagerOverride
         from airflow.www.security import AirflowSecurityManager
 
@@ -346,9 +353,9 @@ class FabAuthManager(BaseAuthManager):
                     "FabAirflowSecurityManagerOverride instead of 
AirflowSecurityManager.",
                     DeprecationWarning,
                 )
-            return sm_from_config
+            return sm_from_config(self.appbuilder)
 
-        return FabAirflowSecurityManagerOverride  # default choice
+        return FabAirflowSecurityManagerOverride(self.appbuilder)
 
     def get_url_login(self, **kwargs) -> str:
         """Return the login page url."""
@@ -489,7 +496,20 @@ class FabAuthManager(BaseAuthManager):
         :meta private:
         """
         if "." in dag_id:
-            return self.security_manager.appbuilder.get_session.scalar(
+            return self.appbuilder.get_session.scalar(
                 select(DagModel.dag_id, 
DagModel.root_dag_id).where(DagModel.dag_id == dag_id).limit(1)
             )
         return dag_id
+
+    def _sync_appbuilder_roles(self):
+        """
+        Sync appbuilder roles to DB.
+
+        :meta private:
+        """
+        # Garbage collect old permissions/views after they have been modified.
+        # Otherwise, when the name of a view or menu is changed, the framework
+        # will add the new Views and Menus names to the backend, but will not
+        # delete the old ones.
+        if conf.getboolean("webserver", "UPDATE_FAB_PERMS"):
+            self.security_manager.sync_roles()
diff --git a/airflow/auth/managers/fab/security_manager/constants.py 
b/airflow/auth/managers/fab/security_manager/constants.py
new file mode 100644
index 0000000000..fd600ec267
--- /dev/null
+++ b/airflow/auth/managers/fab/security_manager/constants.py
@@ -0,0 +1,26 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+EXISTING_ROLES = {
+    "Admin",
+    "Viewer",
+    "User",
+    "Op",
+    "Public",
+}
diff --git a/airflow/auth/managers/fab/security_manager/override.py 
b/airflow/auth/managers/fab/security_manager/override.py
index 9f994f5c50..9bc34912f2 100644
--- a/airflow/auth/managers/fab/security_manager/override.py
+++ b/airflow/auth/managers/fab/security_manager/override.py
@@ -18,6 +18,7 @@
 from __future__ import annotations
 
 import datetime
+import itertools
 import logging
 import os
 import random
@@ -49,8 +50,9 @@ from flask_jwt_extended import JWTManager, current_user as 
current_user_jwt
 from flask_login import LoginManager
 from itsdangerous import want_bytes
 from markupsafe import Markup
-from sqlalchemy import and_, func, inspect, select
+from sqlalchemy import and_, func, inspect, or_, select
 from sqlalchemy.exc import MultipleResultsFound
+from sqlalchemy.orm import Session, joinedload
 from werkzeug.security import check_password_hash, generate_password_hash
 
 from airflow.auth.managers.fab.fab_auth_manager import 
MAP_METHOD_NAME_TO_FAB_ACTION_NAME
@@ -63,9 +65,10 @@ from airflow.auth.managers.fab.models import (
     assoc_permission_role,
 )
 from airflow.auth.managers.fab.models.anonymous_user import AnonymousUser
+from airflow.auth.managers.fab.security_manager.constants import EXISTING_ROLES
 from airflow.configuration import conf
 from airflow.exceptions import AirflowException, RemovedInAirflow3Warning
-from airflow.models import DagModel
+from airflow.models import DagBag, DagModel
 from airflow.security import permissions
 from airflow.utils.session import NEW_SESSION, provide_session
 from airflow.www.extensions.init_auth_manager import get_auth_manager
@@ -73,8 +76,6 @@ from airflow.www.security_manager import 
AirflowSecurityManagerV2
 from airflow.www.session import AirflowDatabaseSessionInterface
 
 if TYPE_CHECKING:
-    from sqlalchemy.orm import Session
-
     from airflow.auth.managers.base_auth_manager import ResourceMethod
     from airflow.auth.managers.fab.models import User
     from airflow.www.fab_security.manager import BaseSecurityManager
@@ -126,6 +127,9 @@ class 
FabAirflowSecurityManagerOverride(AirflowSecurityManagerV2):
     oauth_allow_list: dict[str, list] = {}
     """ Initialized (remote_app) providers dict {'provider_name', OBJ } """
 
+    # global resource for dag-level access
+    DAG_RESOURCES = {permissions.RESOURCE_DAG}
+
     def __init__(self, appbuilder):
         # done in super, but we need it before we can call super.
         self.appbuilder = appbuilder
@@ -760,6 +764,160 @@ class 
FabAirflowSecurityManagerOverride(AirflowSecurityManagerV2):
             return any(self.get_readable_dag_ids(user))
         return any(self.get_editable_dag_ids(user))
 
+    def get_all_permissions(self) -> set[tuple[str, str]]:
+        """Return all permissions as a set of tuples with the action and 
resource names."""
+        return set(
+            self.appbuilder.get_session.execute(
+                select(self.action_model.name, self.resource_model.name)
+                .join(self.permission_model.action)
+                .join(self.permission_model.resource)
+            )
+        )
+
+    def create_dag_specific_permissions(self) -> None:
+        """
+        Add permissions to all DAGs.
+
+        Creates 'can_read', 'can_edit', and 'can_delete' permissions for all
+        DAGs, along with any `access_control` permissions provided in them.
+
+        This does iterate through ALL the DAGs, which can be slow. See 
`sync_perm_for_dag`
+        if you only need to sync a single DAG.
+        """
+        perms = self.get_all_permissions()
+        dagbag = DagBag(read_dags_from_db=True)
+        dagbag.collect_dags_from_db()
+        dags = dagbag.dags.values()
+
+        for dag in dags:
+            root_dag_id = dag.parent_dag.dag_id if dag.parent_dag else 
dag.dag_id
+            dag_resource_name = permissions.resource_name_for_dag(root_dag_id)
+            for action_name in self.DAG_ACTIONS:
+                if (action_name, dag_resource_name) not in perms:
+                    self._merge_perm(action_name, dag_resource_name)
+
+            if dag.access_control is not None:
+                self.sync_perm_for_dag(dag_resource_name, dag.access_control)
+
+    def sync_roles(self) -> None:
+        """
+        Initialize default and custom roles with related permissions.
+
+        1. Init the default role(Admin, Viewer, User, Op, public)
+           with related permissions.
+        2. Init the custom role(dag-user) with related permissions.
+        """
+        # Create global all-dag permissions
+        self.create_perm_vm_for_all_dag()
+
+        # Sync the default roles (Admin, Viewer, User, Op, public) with 
related permissions
+        self.bulk_sync_roles(self.ROLE_CONFIGS)
+
+        self.add_homepage_access_to_custom_roles()
+        # init existing roles, the rest role could be created through UI.
+        self.update_admin_permission()
+        self.clean_perms()
+
+    def create_perm_vm_for_all_dag(self) -> None:
+        """Create perm-vm if not exist and insert into FAB security model for 
all-dags."""
+        # create perm for global logical dag
+        for resource_name, action_name in 
itertools.product(self.DAG_RESOURCES, self.DAG_ACTIONS):
+            self._merge_perm(action_name, resource_name)
+
+    def add_homepage_access_to_custom_roles(self) -> None:
+        """Add Website.can_read access to all custom roles."""
+        website_permission = 
self.create_permission(permissions.ACTION_CAN_READ, 
permissions.RESOURCE_WEBSITE)
+        custom_roles = [role for role in self.get_all_roles() if role.name not 
in EXISTING_ROLES]
+        for role in custom_roles:
+            self.add_permission_to_role(role, website_permission)
+
+        self.appbuilder.get_session.commit()
+
+    def update_admin_permission(self) -> None:
+        """
+        Add missing permissions to the table for admin.
+
+        Admin should get all the permissions, except the dag permissions
+        because Admin already has Dags permission.
+        Add the missing ones to the table for admin.
+        """
+        session = self.appbuilder.get_session
+        dag_resources = session.scalars(
+            
select(Resource).where(Resource.name.like(f"{permissions.RESOURCE_DAG_PREFIX}%"))
+        )
+        resource_ids = [resource.id for resource in dag_resources]
+
+        perms = 
session.scalars(select(Permission).where(~Permission.resource_id.in_(resource_ids)))
+        perms = [p for p in perms if p.action and p.resource]
+
+        admin = self.find_role("Admin")
+        admin.permissions = list(set(admin.permissions) | set(perms))
+
+        session.commit()
+
+    def clean_perms(self) -> None:
+        """FAB leaves faulty permissions that need to be cleaned up."""
+        self.log.debug("Cleaning faulty perms")
+        sesh = self.appbuilder.get_session
+        perms = sesh.query(Permission).filter(
+            or_(
+                Permission.action == None,  # noqa
+                Permission.resource == None,  # noqa
+            )
+        )
+        # Since FAB doesn't define ON DELETE CASCADE on these tables, we need
+        # to delete the _object_ so that SQLA knows to delete the many-to-many
+        # relationship object too. :(
+
+        deleted_count = 0
+        for perm in perms:
+            sesh.delete(perm)
+            deleted_count += 1
+        sesh.commit()
+        if deleted_count:
+            self.log.info("Deleted %s faulty permissions", deleted_count)
+
+    def init_role(self, role_name, perms) -> None:
+        """
+        Initialize the role with actions and related resources.
+
+        :param role_name:
+        :param perms:
+        """
+        warnings.warn(
+            "`init_role` has been deprecated. Please use `bulk_sync_roles` 
instead.",
+            RemovedInAirflow3Warning,
+            stacklevel=2,
+        )
+        self.bulk_sync_roles([{"role": role_name, "perms": perms}])
+
+    def bulk_sync_roles(self, roles: Iterable[dict[str, Any]]) -> None:
+        """Sync the provided roles and permissions."""
+        existing_roles = self._get_all_roles_with_permissions()
+        non_dag_perms = self._get_all_non_dag_permissions()
+
+        for config in roles:
+            role_name = config["role"]
+            perms = config["perms"]
+            role = existing_roles.get(role_name) or self.add_role(role_name)
+
+            for action_name, resource_name in perms:
+                perm = non_dag_perms.get((action_name, resource_name)) or 
self.create_permission(
+                    action_name, resource_name
+                )
+
+                if perm not in role.permissions:
+                    self.add_permission_to_role(role, perm)
+
+    def sync_resource_permissions(self, perms: Iterable[tuple[str, str]] | 
None = None) -> None:
+        """Populate resource-based permissions."""
+        if not perms:
+            return
+
+        for action_name, resource_name in perms:
+            self.create_resource(resource_name)
+            self.create_permission(action_name, resource_name)
+
     """
     -----------
     Role entity
@@ -833,7 +991,6 @@ class 
FabAirflowSecurityManagerOverride(AirflowSecurityManagerV2):
         - we use AUTH_ROLES_MAPPING to map from keys, to FAB role names
 
         :param role_keys: the list of FAB role keys
-        :return: a list of Role
         """
         _roles = set()
         _role_keys = set(role_keys)
@@ -1021,7 +1178,6 @@ class 
FabAirflowSecurityManagerOverride(AirflowSecurityManagerV2):
         Gets an existing action record.
 
         :param name: name
-        :return: Action record, if it exists
         """
         return 
self.get_session.query(self.action_model).filter_by(name=name).one_or_none()
 
@@ -1091,7 +1247,6 @@ class 
FabAirflowSecurityManagerOverride(AirflowSecurityManagerV2):
         Create a resource with the given name.
 
         :param name: The name of the resource to create created.
-        :return: The FAB resource created.
         """
         resource = self.get_resource(name)
         if resource is None:
@@ -1107,11 +1262,7 @@ class 
FabAirflowSecurityManagerOverride(AirflowSecurityManagerV2):
         return resource
 
     def get_all_resources(self) -> list[Resource]:
-        """
-        Gets all existing resource records.
-
-        :return: List of all resources
-        """
+        """Gets all existing resource records."""
         return self.get_session.query(self.resource_model).all()
 
     def delete_resource(self, name: str) -> bool:
@@ -1158,7 +1309,6 @@ class 
FabAirflowSecurityManagerOverride(AirflowSecurityManagerV2):
 
         :param action_name: Name of action
         :param resource_name: Name of resource
-        :return: The existing permission
         """
         action = self.get_action(action_name)
         resource = self.get_resource(resource_name)
@@ -1175,7 +1325,6 @@ class 
FabAirflowSecurityManagerOverride(AirflowSecurityManagerV2):
         Retrieve permission pairs associated with a specific resource object.
 
         :param resource: Object representing a single resource.
-        :return: Action objects representing resource->action pair
         """
         return 
self.get_session.query(self.permission_model).filter_by(resource_id=resource.id).all()
 
@@ -1215,7 +1364,6 @@ class 
FabAirflowSecurityManagerOverride(AirflowSecurityManagerV2):
 
         :param action_name: Name of existing action
         :param resource_name: Name of existing resource
-        :return: None
         """
         if not (action_name and resource_name):
             return
@@ -1246,7 +1394,6 @@ class 
FabAirflowSecurityManagerOverride(AirflowSecurityManagerV2):
 
         :param role: The role about to get a new permission.
         :param permission: The permission pair to add to a role.
-        :return: None
         """
         if permission and permission not in role.permissions:
             try:
@@ -2013,6 +2160,53 @@ class 
FabAirflowSecurityManagerOverride(AirflowSecurityManagerV2):
         result.update(role_resource_names)
         return result
 
+    def _merge_perm(self, action_name: str, resource_name: str) -> None:
+        """
+        Add the new (action, resource) to assoc_permission_role if it doesn't 
exist.
+
+        It will add the related entry to ab_permission and ab_resource two 
meta tables as well.
+
+        :param action_name: Name of the action
+        :param resource_name: Name of the resource
+        """
+        action = self.get_action(action_name)
+        resource = self.get_resource(resource_name)
+        perm = None
+        if action and resource:
+            perm = self.appbuilder.get_session.scalar(
+                select(self.permission_model).filter_by(action=action, 
resource=resource).limit(1)
+            )
+        if not perm and action_name and resource_name:
+            self.create_permission(action_name, resource_name)
+
+    def _get_all_roles_with_permissions(self) -> dict[str, Role]:
+        """Return a dict with a key of role name and value of role with early 
loaded permissions."""
+        return {
+            r.name: r
+            for r in self.appbuilder.get_session.scalars(
+                
select(self.role_model).options(joinedload(self.role_model.permissions))
+            ).unique()
+        }
+
+    def _get_all_non_dag_permissions(self) -> dict[tuple[str, str], 
Permission]:
+        """
+        Get permissions except those that are for specific DAGs.
+
+        Returns a dict with a key of (action_name, resource_name) and value of 
permission
+        with all permissions except those that are for specific DAGs.
+        """
+        return {
+            (action_name, resource_name): viewmodel
+            for action_name, resource_name, viewmodel in (
+                self.appbuilder.get_session.execute(
+                    select(self.action_model.name, self.resource_model.name, 
self.permission_model)
+                    .join(self.permission_model.action)
+                    .join(self.permission_model.resource)
+                    
.where(~self.resource_model.name.like(f"{permissions.RESOURCE_DAG_PREFIX}%"))
+                )
+            )
+        }
+
     def filter_roles_by_perm_with_action(self, action_name: str, role_ids: 
list[int]):
         """Find roles with permission."""
         return (
diff --git a/airflow/www/app.py b/airflow/www/app.py
index ac6b87e79d..b8be40a421 100644
--- a/airflow/www/app.py
+++ b/airflow/www/app.py
@@ -36,6 +36,7 @@ from airflow.settings import _ENABLE_AIP_44
 from airflow.utils.json import AirflowJsonProvider
 from airflow.www.extensions.init_appbuilder import init_appbuilder
 from airflow.www.extensions.init_appbuilder_links import init_appbuilder_links
+from airflow.www.extensions.init_auth_manager import get_auth_manager
 from airflow.www.extensions.init_cache import init_cache
 from airflow.www.extensions.init_dagbag import init_dagbag
 from airflow.www.extensions.init_jinja_globals import init_jinja_globals
@@ -67,16 +68,6 @@ app: Flask | None = None
 csrf = CSRFProtect()
 
 
-def sync_appbuilder_roles(flask_app):
-    """Sync appbuilder roles to DB."""
-    # Garbage collect old permissions/views after they have been modified.
-    # Otherwise, when the name of a view or menu is changed, the framework
-    # will add the new Views and Menus names to the backend, but will not
-    # delete the old ones.
-    if conf.getboolean("webserver", "UPDATE_FAB_PERMS"):
-        flask_app.appbuilder.sm.sync_roles()
-
-
 def create_app(config=None, testing=False):
     """Create a new instance of Airflow WWW app."""
     flask_app = Flask(__name__)
@@ -174,7 +165,7 @@ def create_app(config=None, testing=False):
         init_api_auth_provider(flask_app)
         init_api_error_handlers(flask_app)  # needs to be after all api inits 
to let them add their path first
 
-        sync_appbuilder_roles(flask_app)
+        get_auth_manager().init()
 
         init_jinja_globals(flask_app)
         init_xframe_protection(flask_app)
diff --git a/airflow/www/extensions/init_appbuilder.py 
b/airflow/www/extensions/init_appbuilder.py
index acac0f2338..4503b81b43 100644
--- a/airflow/www/extensions/init_appbuilder.py
+++ b/airflow/www/extensions/init_appbuilder.py
@@ -96,12 +96,11 @@ class AirflowAppBuilder:
         app = Flask(__name__)
         app.config.from_object('config')
         dbmongo = MongoEngine(app)
-        appbuilder = AppBuilder(app, security_manager_class=SecurityManager)
+        appbuilder = AppBuilder(app)
     You can also create everything as an application factory.
     """
 
     baseviews: list[BaseView | Session] = []
-    security_manager_class = None
     # Flask app
     app = None
     # Database Session
@@ -132,7 +131,6 @@ class AirflowAppBuilder:
         base_template="airflow/main.html",
         static_folder="static/appbuilder",
         static_url_path="/appbuilder",
-        security_manager_class=None,
         update_perms=conf.getboolean("webserver", "UPDATE_FAB_PERMS"),
         auth_rate_limited=conf.getboolean("webserver", "AUTH_RATE_LIMITED", 
fallback=True),
         auth_rate_limit=conf.get("webserver", "AUTH_RATE_LIMIT", fallback="5 
per 40 second"),
@@ -152,8 +150,6 @@ class AirflowAppBuilder:
             optional, your override for the global static folder
         :param static_url_path:
             optional, your override for the global static url path
-        :param security_manager_class:
-            optional, pass your own security manager class
         :param update_perms:
             optional, update permissions flag (Boolean) you can use
             FAB_UPDATE_PERMS config key also
@@ -167,7 +163,6 @@ class AirflowAppBuilder:
         self.addon_managers = {}
         self.menu = menu
         self.base_template = base_template
-        self.security_manager_class = security_manager_class
         self.indexview = indexview
         self.static_folder = static_folder
         self.static_url_path = static_url_path
@@ -219,9 +214,8 @@ class AirflowAppBuilder:
 
         self._addon_managers = app.config["ADDON_MANAGERS"]
         self.session = session
-        self.sm = self.security_manager_class(self)
-        auth_manager = get_auth_manager()
-        auth_manager.security_manager = self.sm
+        auth_manager = init_auth_manager(app, self)
+        self.sm = auth_manager.security_manager
         self.bm = BabelManager(self)
         self._add_global_static()
         self._add_global_filters()
@@ -659,11 +653,9 @@ class AirflowAppBuilder:
 
 def init_appbuilder(app: Flask) -> AirflowAppBuilder:
     """Init `Flask App Builder 
<https://flask-appbuilder.readthedocs.io/en/latest/>`__."""
-    auth_manager = init_auth_manager(app)
     return AirflowAppBuilder(
         app=app,
         session=settings.Session,
-        
security_manager_class=auth_manager.get_security_manager_override_class(),
         base_template="airflow/main.html",
         update_perms=conf.getboolean("webserver", "UPDATE_FAB_PERMS"),
         auth_rate_limited=conf.getboolean("webserver", "AUTH_RATE_LIMITED", 
fallback=True),
diff --git a/airflow/www/extensions/init_auth_manager.py 
b/airflow/www/extensions/init_auth_manager.py
index b84e45ae7f..63ec043c10 100644
--- a/airflow/www/extensions/init_auth_manager.py
+++ b/airflow/www/extensions/init_auth_manager.py
@@ -25,6 +25,7 @@ if TYPE_CHECKING:
     from flask import Flask
 
     from airflow.auth.managers.base_auth_manager import BaseAuthManager
+    from airflow.www.extensions.init_appbuilder import AirflowAppBuilder
 
 auth_manager: BaseAuthManager | None = None
 
@@ -46,15 +47,15 @@ def get_auth_manager_cls() -> type[BaseAuthManager]:
     return auth_manager_cls
 
 
-def init_auth_manager(app: Flask) -> BaseAuthManager:
+def init_auth_manager(app: Flask, appbuilder: AirflowAppBuilder) -> 
BaseAuthManager:
     """
-    Initialize the auth manager with the given flask app object.
+    Initialize the auth manager.
 
     Import the user manager class and instantiate it.
     """
     global auth_manager
     auth_manager_cls = get_auth_manager_cls()
-    auth_manager = auth_manager_cls(app)
+    auth_manager = auth_manager_cls(app, appbuilder)
     return auth_manager
 
 
diff --git a/airflow/www/security_manager.py b/airflow/www/security_manager.py
index 63d1be3e8f..1d6bc592a5 100644
--- a/airflow/www/security_manager.py
+++ b/airflow/www/security_manager.py
@@ -16,15 +16,13 @@
 # under the License.
 from __future__ import annotations
 
-import itertools
 import warnings
-from typing import TYPE_CHECKING, Any, Collection, Iterable, Sequence
+from typing import TYPE_CHECKING, Any, Collection, Sequence
 
 from flask import g
-from sqlalchemy import or_, select
-from sqlalchemy.orm import joinedload
+from sqlalchemy import select
 
-from airflow.auth.managers.fab.models import Permission, Resource, Role
+from airflow.auth.managers.fab.security_manager.constants import 
EXISTING_ROLES as FAB_EXISTING_ROLES
 from airflow.auth.managers.fab.views.permissions import (
     ActionModelView,
     PermissionPairModelView,
@@ -45,21 +43,17 @@ from airflow.auth.managers.fab.views.user_edit import (
 )
 from airflow.auth.managers.fab.views.user_stats import CustomUserStatsChartView
 from airflow.exceptions import AirflowException, RemovedInAirflow3Warning
-from airflow.models import DagBag, DagModel
+from airflow.models import DagModel
 from airflow.security import permissions
 from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.www.fab_security.sqla.manager import SecurityManager
 from airflow.www.utils import CustomSQLAInterface
 
-EXISTING_ROLES = {
-    "Admin",
-    "Viewer",
-    "User",
-    "Op",
-    "Public",
-}
+EXISTING_ROLES = FAB_EXISTING_ROLES
 
 if TYPE_CHECKING:
+    from airflow.auth.managers.fab.models import Permission, Resource
+
     pass
 
 
@@ -163,7 +157,6 @@ class AirflowSecurityManagerV2(SecurityManager, 
LoggingMixin):
     ]
 
     # global resource for dag-level access
-    DAG_RESOURCES = {permissions.RESOURCE_DAG}
     DAG_ACTIONS = permissions.DAG_ACTIONS
 
     ###########################################################################
@@ -222,39 +215,6 @@ class AirflowSecurityManagerV2(SecurityManager, 
LoggingMixin):
             return dm.root_dag_id or dm.dag_id
         return dag_id
 
-    def init_role(self, role_name, perms) -> None:
-        """
-        Initialize the role with actions and related resources.
-
-        :param role_name:
-        :param perms:
-        :return:
-        """
-        warnings.warn(
-            "`init_role` has been deprecated. Please use `bulk_sync_roles` 
instead.",
-            RemovedInAirflow3Warning,
-            stacklevel=2,
-        )
-        self.bulk_sync_roles([{"role": role_name, "perms": perms}])
-
-    def bulk_sync_roles(self, roles: Iterable[dict[str, Any]]) -> None:
-        """Sync the provided roles and permissions."""
-        existing_roles = self._get_all_roles_with_permissions()
-        non_dag_perms = self._get_all_non_dag_permissions()
-
-        for config in roles:
-            role_name = config["role"]
-            perms = config["perms"]
-            role = existing_roles.get(role_name) or self.add_role(role_name)
-
-            for action_name, resource_name in perms:
-                perm = non_dag_perms.get((action_name, resource_name)) or 
self.create_permission(
-                    action_name, resource_name
-                )
-
-                if perm not in role.permissions:
-                    self.add_permission_to_role(role, perm)
-
     @staticmethod
     def get_user_roles(user=None):
         """
@@ -308,150 +268,6 @@ class AirflowSecurityManagerV2(SecurityManager, 
LoggingMixin):
 
         return False
 
-    def clean_perms(self) -> None:
-        """FAB leaves faulty permissions that need to be cleaned up."""
-        self.log.debug("Cleaning faulty perms")
-        sesh = self.appbuilder.get_session
-        perms = sesh.query(Permission).filter(
-            or_(
-                Permission.action == None,  # noqa
-                Permission.resource == None,  # noqa
-            )
-        )
-        # Since FAB doesn't define ON DELETE CASCADE on these tables, we need
-        # to delete the _object_ so that SQLA knows to delete the many-to-many
-        # relationship object too. :(
-
-        deleted_count = 0
-        for perm in perms:
-            sesh.delete(perm)
-            deleted_count += 1
-        sesh.commit()
-        if deleted_count:
-            self.log.info("Deleted %s faulty permissions", deleted_count)
-
-    def _merge_perm(self, action_name: str, resource_name: str) -> None:
-        """
-        Add the new (action, resource) to assoc_permission_role if it doesn't 
exist.
-
-        It will add the related entry to ab_permission and ab_resource two 
meta tables as well.
-
-        :param action_name: Name of the action
-        :param resource_name: Name of the resource
-        :return:
-        """
-        action = self.get_action(action_name)
-        resource = self.get_resource(resource_name)
-        perm = None
-        if action and resource:
-            perm = self.appbuilder.get_session.scalar(
-                select(self.permission_model).filter_by(action=action, 
resource=resource).limit(1)
-            )
-        if not perm and action_name and resource_name:
-            self.create_permission(action_name, resource_name)
-
-    def add_homepage_access_to_custom_roles(self) -> None:
-        """
-        Add Website.can_read access to all custom roles.
-
-        :return: None.
-        """
-        website_permission = 
self.create_permission(permissions.ACTION_CAN_READ, 
permissions.RESOURCE_WEBSITE)
-        custom_roles = [role for role in self.get_all_roles() if role.name not 
in EXISTING_ROLES]
-        for role in custom_roles:
-            self.add_permission_to_role(role, website_permission)
-
-        self.appbuilder.get_session.commit()
-
-    def get_all_permissions(self) -> set[tuple[str, str]]:
-        """Return all permissions as a set of tuples with the action and 
resource names."""
-        return set(
-            self.appbuilder.get_session.execute(
-                select(self.action_model.name, self.resource_model.name)
-                .join(self.permission_model.action)
-                .join(self.permission_model.resource)
-            )
-        )
-
-    def _get_all_non_dag_permissions(self) -> dict[tuple[str, str], 
Permission]:
-        """
-        Get permissions except those that are for specific DAGs.
-
-        Returns a dict with a key of (action_name, resource_name) and value of 
permission
-        with all permissions except those that are for specific DAGs.
-        """
-        return {
-            (action_name, resource_name): viewmodel
-            for action_name, resource_name, viewmodel in (
-                self.appbuilder.get_session.execute(
-                    select(self.action_model.name, self.resource_model.name, 
self.permission_model)
-                    .join(self.permission_model.action)
-                    .join(self.permission_model.resource)
-                    
.where(~self.resource_model.name.like(f"{permissions.RESOURCE_DAG_PREFIX}%"))
-                )
-            )
-        }
-
-    def _get_all_roles_with_permissions(self) -> dict[str, Role]:
-        """Return a dict with a key of role name and value of role with early 
loaded permissions."""
-        return {
-            r.name: r
-            for r in self.appbuilder.get_session.scalars(
-                
select(self.role_model).options(joinedload(self.role_model.permissions))
-            ).unique()
-        }
-
-    def create_dag_specific_permissions(self) -> None:
-        """
-        Add permissions to all DAGs.
-
-        Creates 'can_read', 'can_edit', and 'can_delete' permissions for all
-        DAGs, along with any `access_control` permissions provided in them.
-
-        This does iterate through ALL the DAGs, which can be slow. See 
`sync_perm_for_dag`
-        if you only need to sync a single DAG.
-
-        :return: None.
-        """
-        perms = self.get_all_permissions()
-        dagbag = DagBag(read_dags_from_db=True)
-        dagbag.collect_dags_from_db()
-        dags = dagbag.dags.values()
-
-        for dag in dags:
-            root_dag_id = dag.parent_dag.dag_id if dag.parent_dag else 
dag.dag_id
-            dag_resource_name = permissions.resource_name_for_dag(root_dag_id)
-            for action_name in self.DAG_ACTIONS:
-                if (action_name, dag_resource_name) not in perms:
-                    self._merge_perm(action_name, dag_resource_name)
-
-            if dag.access_control is not None:
-                self.sync_perm_for_dag(dag_resource_name, dag.access_control)
-
-    def update_admin_permission(self) -> None:
-        """
-        Add missing permissions to the table for admin.
-
-        Admin should get all the permissions, except the dag permissions
-        because Admin already has Dags permission.
-        Add the missing ones to the table for admin.
-
-        :return: None.
-        """
-        session = self.appbuilder.get_session
-        dag_resources = session.scalars(
-            
select(Resource).where(Resource.name.like(f"{permissions.RESOURCE_DAG_PREFIX}%"))
-        )
-        resource_ids = [resource.id for resource in dag_resources]
-
-        perms = 
session.scalars(select(Permission).where(~Permission.resource_id.in_(resource_ids)))
-        perms = [p for p in perms if p.action and p.resource]
-
-        admin = self.find_role("Admin")
-        admin.permissions = list(set(admin.permissions) | set(perms))
-
-        session.commit()
-
     def create_admin_standalone(self) -> tuple[str | None, str | None]:
         """Perform the required steps when initializing airflow for standalone 
mode.
 
@@ -459,36 +275,6 @@ class AirflowSecurityManagerV2(SecurityManager, 
LoggingMixin):
         """
         return None, None
 
-    def sync_roles(self) -> None:
-        """
-        Initialize default and custom roles with related permissions.
-
-        1. Init the default role(Admin, Viewer, User, Op, public)
-           with related permissions.
-        2. Init the custom role(dag-user) with related permissions.
-
-        :return: None.
-        """
-        # Create global all-dag permissions
-        self.create_perm_vm_for_all_dag()
-
-        # Sync the default roles (Admin, Viewer, User, Op, public) with 
related permissions
-        self.bulk_sync_roles(self.ROLE_CONFIGS)
-
-        self.add_homepage_access_to_custom_roles()
-        # init existing roles, the rest role could be created through UI.
-        self.update_admin_permission()
-        self.clean_perms()
-
-    def sync_resource_permissions(self, perms: Iterable[tuple[str, str]] | 
None = None) -> None:
-        """Populate resource-based permissions."""
-        if not perms:
-            return
-
-        for action_name, resource_name in perms:
-            self.create_resource(resource_name)
-            self.create_permission(action_name, resource_name)
-
     def sync_perm_for_dag(
         self,
         dag_id: str,
@@ -577,12 +363,6 @@ class AirflowSecurityManagerV2(SecurityManager, 
LoggingMixin):
                 if dag_perm:
                     self.add_permission_to_role(role, dag_perm)
 
-    def create_perm_vm_for_all_dag(self) -> None:
-        """Create perm-vm if not exist and insert into FAB security model for 
all-dags."""
-        # create perm for global logical dag
-        for resource_name, action_name in 
itertools.product(self.DAG_RESOURCES, self.DAG_ACTIONS):
-            self._merge_perm(action_name, resource_name)
-
     def check_authorization(
         self,
         perms: Sequence[tuple[str, str]] | None = None,
diff --git 
a/tests/auth/managers/fab/api_endpoints/test_role_and_permission_endpoint.py 
b/tests/auth/managers/fab/api_endpoints/test_role_and_permission_endpoint.py
index d55cce591a..80bff699ad 100644
--- a/tests/auth/managers/fab/api_endpoints/test_role_and_permission_endpoint.py
+++ b/tests/auth/managers/fab/api_endpoints/test_role_and_permission_endpoint.py
@@ -20,8 +20,8 @@ import pytest
 
 from airflow.api_connexion.exceptions import EXCEPTIONS_LINK_MAP
 from airflow.auth.managers.fab.models import Role
+from airflow.auth.managers.fab.security_manager.constants import EXISTING_ROLES
 from airflow.security import permissions
-from airflow.www.security_manager import EXISTING_ROLES
 from tests.test_utils.api_connexion_utils import (
     assert_401,
     create_role,
diff --git a/tests/auth/managers/fab/test_fab_auth_manager.py 
b/tests/auth/managers/fab/test_fab_auth_manager.py
index 2e545457dd..d9f58fded1 100644
--- a/tests/auth/managers/fab/test_fab_auth_manager.py
+++ b/tests/auth/managers/fab/test_fab_auth_manager.py
@@ -21,6 +21,7 @@ from unittest import mock
 from unittest.mock import Mock
 
 import pytest
+from flask import Flask
 
 from airflow.auth.managers.fab.fab_auth_manager import FabAuthManager
 from airflow.auth.managers.fab.models import User
@@ -43,7 +44,7 @@ from airflow.security.permissions import (
     RESOURCE_VARIABLE,
     RESOURCE_WEBSITE,
 )
-from airflow.www.security_appless import ApplessAirflowSecurityManager
+from airflow.www.extensions.init_appbuilder import init_appbuilder
 
 IS_AUTHORIZED_METHODS_SIMPLE = {
     "is_authorized_configuration": RESOURCE_CONFIG,
@@ -56,13 +57,17 @@ IS_AUTHORIZED_METHODS_SIMPLE = {
 
 @pytest.fixture
 def auth_manager():
-    app_mock = Mock(name="flask_app")
-    app_mock.config.get.return_value = None  # this is called to get the 
security manager override (if any)
-    auth_manager = FabAuthManager(app_mock)
-    auth_manager.security_manager = ApplessAirflowSecurityManager()
-    return auth_manager
+    return FabAuthManager(None, None)
 
 
+@pytest.fixture
+def auth_manager_with_appbuilder():
+    flask_app = Flask(__name__)
+    appbuilder = init_appbuilder(flask_app)
+    return FabAuthManager(flask_app, appbuilder)
+
+
+@pytest.mark.db_test
 class TestFabAuthManager:
     @pytest.mark.parametrize(
         "id,first_name,last_name,username,email,expected",
@@ -346,47 +351,55 @@ class TestFabAuthManager:
         result = auth_manager.is_authorized_website(user=user)
         assert result == expected_result
 
-    def 
test_get_security_manager_override_class_return_fab_security_manager_override(self,
 auth_manager):
-        assert auth_manager.get_security_manager_override_class() is 
FabAirflowSecurityManagerOverride
+    @pytest.mark.db_test
+    def test_security_manager_return_fab_security_manager_override(self, 
auth_manager_with_appbuilder):
+        assert isinstance(auth_manager_with_appbuilder.security_manager, 
FabAirflowSecurityManagerOverride)
 
-    def test_get_url_login_when_auth_view_not_defined(self, auth_manager):
+    @pytest.mark.db_test
+    def test_get_url_login_when_auth_view_not_defined(self, 
auth_manager_with_appbuilder):
         with pytest.raises(AirflowException, match="`auth_view` not defined in 
the security manager."):
-            auth_manager.get_url_login()
+            auth_manager_with_appbuilder.get_url_login()
 
+    @pytest.mark.db_test
     @mock.patch("airflow.auth.managers.fab.fab_auth_manager.url_for")
-    def test_get_url_login(self, mock_url_for, auth_manager):
-        auth_manager.security_manager.auth_view = Mock()
-        auth_manager.security_manager.auth_view.endpoint = "test_endpoint"
-        auth_manager.get_url_login()
+    def test_get_url_login(self, mock_url_for, auth_manager_with_appbuilder):
+        auth_manager_with_appbuilder.security_manager.auth_view = Mock()
+        auth_manager_with_appbuilder.security_manager.auth_view.endpoint = 
"test_endpoint"
+        auth_manager_with_appbuilder.get_url_login()
         mock_url_for.assert_called_once_with("test_endpoint.login")
 
+    @pytest.mark.db_test
     @mock.patch("airflow.auth.managers.fab.fab_auth_manager.url_for")
-    def test_get_url_login_with_next(self, mock_url_for, auth_manager):
-        auth_manager.security_manager.auth_view = Mock()
-        auth_manager.security_manager.auth_view.endpoint = "test_endpoint"
-        auth_manager.get_url_login(next_url="next_url")
+    def test_get_url_login_with_next(self, mock_url_for, 
auth_manager_with_appbuilder):
+        auth_manager_with_appbuilder.security_manager.auth_view = Mock()
+        auth_manager_with_appbuilder.security_manager.auth_view.endpoint = 
"test_endpoint"
+        auth_manager_with_appbuilder.get_url_login(next_url="next_url")
         mock_url_for.assert_called_once_with("test_endpoint.login", 
next="next_url")
 
-    def test_get_url_logout_when_auth_view_not_defined(self, auth_manager):
+    @pytest.mark.db_test
+    def test_get_url_logout_when_auth_view_not_defined(self, 
auth_manager_with_appbuilder):
         with pytest.raises(AirflowException, match="`auth_view` not defined in 
the security manager."):
-            auth_manager.get_url_logout()
+            auth_manager_with_appbuilder.get_url_logout()
 
+    @pytest.mark.db_test
     @mock.patch("airflow.auth.managers.fab.fab_auth_manager.url_for")
-    def test_get_url_logout(self, mock_url_for, auth_manager):
-        auth_manager.security_manager.auth_view = Mock()
-        auth_manager.security_manager.auth_view.endpoint = "test_endpoint"
-        auth_manager.get_url_logout()
+    def test_get_url_logout(self, mock_url_for, auth_manager_with_appbuilder):
+        auth_manager_with_appbuilder.security_manager.auth_view = Mock()
+        auth_manager_with_appbuilder.security_manager.auth_view.endpoint = 
"test_endpoint"
+        auth_manager_with_appbuilder.get_url_logout()
         mock_url_for.assert_called_once_with("test_endpoint.logout")
 
-    def test_get_url_user_profile_when_auth_view_not_defined(self, 
auth_manager):
-        assert auth_manager.get_url_user_profile() is None
+    @pytest.mark.db_test
+    def test_get_url_user_profile_when_auth_view_not_defined(self, 
auth_manager_with_appbuilder):
+        assert auth_manager_with_appbuilder.get_url_user_profile() is None
 
+    @pytest.mark.db_test
     @mock.patch("airflow.auth.managers.fab.fab_auth_manager.url_for")
-    def test_get_url_user_profile(self, mock_url_for, auth_manager):
+    def test_get_url_user_profile(self, mock_url_for, 
auth_manager_with_appbuilder):
         expected_url = "test_url"
         mock_url_for.return_value = expected_url
-        auth_manager.security_manager.user_view = Mock()
-        auth_manager.security_manager.user_view.endpoint = "test_endpoint"
-        actual_url = auth_manager.get_url_user_profile()
+        auth_manager_with_appbuilder.security_manager.user_view = Mock()
+        auth_manager_with_appbuilder.security_manager.user_view.endpoint = 
"test_endpoint"
+        actual_url = auth_manager_with_appbuilder.get_url_user_profile()
         mock_url_for.assert_called_once_with("test_endpoint.userinfo")
         assert actual_url == expected_url
diff --git a/tests/auth/managers/test_base_auth_manager.py 
b/tests/auth/managers/test_base_auth_manager.py
index 416fa75e2a..35e23e631f 100644
--- a/tests/auth/managers/test_base_auth_manager.py
+++ b/tests/auth/managers/test_base_auth_manager.py
@@ -17,12 +17,13 @@
 from __future__ import annotations
 
 from typing import TYPE_CHECKING
+from unittest.mock import MagicMock, Mock
 
 import pytest
+from flask import Flask
 
 from airflow.auth.managers.base_auth_manager import BaseAuthManager, 
ResourceMethod
-from airflow.exceptions import AirflowException
-from airflow.www.security_appless import ApplessAirflowSecurityManager
+from airflow.www.extensions.init_appbuilder import init_appbuilder
 from airflow.www.security_manager import AirflowSecurityManagerV2
 
 if TYPE_CHECKING:
@@ -39,6 +40,9 @@ if TYPE_CHECKING:
 
 
 class EmptyAuthManager(BaseAuthManager):
+    def get_user_display_name(self) -> str:
+        raise NotImplementedError()
+
     def get_user_name(self) -> str:
         raise NotImplementedError()
 
@@ -112,18 +116,76 @@ class EmptyAuthManager(BaseAuthManager):
 
 @pytest.fixture
 def auth_manager():
-    return EmptyAuthManager(None)
+    return EmptyAuthManager(None, None)
 
 
-class TestBaseAuthManager:
-    def test_get_security_manager_override_class_return_empty_class(self, 
auth_manager):
-        assert auth_manager.get_security_manager_override_class() is 
AirflowSecurityManagerV2
+@pytest.fixture
+def auth_manager_with_appbuilder():
+    flask_app = Flask(__name__)
+    appbuilder = init_appbuilder(flask_app)
+    return EmptyAuthManager(flask_app, appbuilder)
 
-    def test_get_security_manager_not_defined(self, auth_manager):
-        with pytest.raises(AirflowException, match="Security manager not 
defined."):
-            _security_manager = auth_manager.security_manager
 
-    def test_get_security_manager_defined(self, auth_manager):
-        auth_manager.security_manager = ApplessAirflowSecurityManager()
-        _security_manager = auth_manager.security_manager
-        assert type(_security_manager) is ApplessAirflowSecurityManager
+class TestBaseAuthManager:
+    def test_get_cli_commands_return_empty_list(self, auth_manager):
+        assert auth_manager.get_cli_commands() == []
+
+    def test_get_api_endpoints_return_none(self, auth_manager):
+        assert auth_manager.get_api_endpoints() is None
+
+    @pytest.mark.db_test
+    def test_security_manager_return_default_security_manager(self, 
auth_manager_with_appbuilder):
+        assert isinstance(auth_manager_with_appbuilder.security_manager, 
AirflowSecurityManagerV2)
+
+    @pytest.mark.parametrize(
+        "access_all, access_per_dag, dag_ids, expected",
+        [
+            # Access to all dags
+            (
+                True,
+                {},
+                ["dag1", "dag2"],
+                {"dag1", "dag2"},
+            ),
+            # No access to any dag
+            (
+                False,
+                {},
+                ["dag1", "dag2"],
+                set(),
+            ),
+            # Access to specific dags
+            (
+                False,
+                {"dag1": True},
+                ["dag1", "dag2"],
+                {"dag1"},
+            ),
+        ],
+    )
+    def test_get_permitted_dag_ids(
+        self, auth_manager, access_all: bool, access_per_dag: dict, dag_ids: 
list, expected: set
+    ):
+        def side_effect_func(
+            *,
+            method: ResourceMethod,
+            access_entity: DagAccessEntity | None = None,
+            details: DagDetails | None = None,
+            user: BaseUser | None = None,
+        ):
+            if not details:
+                return access_all
+            else:
+                return access_per_dag.get(details.id, False)
+
+        auth_manager.is_authorized_dag = 
MagicMock(side_effect=side_effect_func)
+        user = Mock()
+        session = Mock()
+        dags = []
+        for dag_id in dag_ids:
+            mock = Mock()
+            mock.dag_id = dag_id
+            dags.append(mock)
+        session.execute.return_value = dags
+        result = auth_manager.get_permitted_dag_ids(user=user, session=session)
+        assert result == expected
diff --git a/tests/conftest.py b/tests/conftest.py
index 4067d06f0f..8fd97e95a0 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -280,8 +280,8 @@ def initial_db_init():
 
     from airflow.configuration import conf
     from airflow.utils import db
-    from airflow.www.app import sync_appbuilder_roles
     from airflow.www.extensions.init_appbuilder import init_appbuilder
+    from airflow.www.extensions.init_auth_manager import get_auth_manager
 
     db.resetdb()
     db.bootstrap_dagbag()
@@ -289,7 +289,7 @@ def initial_db_init():
     flask_app = Flask(__name__)
     flask_app.config["SQLALCHEMY_DATABASE_URI"] = conf.get("database", 
"SQL_ALCHEMY_CONN")
     init_appbuilder(flask_app)
-    sync_appbuilder_roles(flask_app)
+    get_auth_manager().init()
 
 
 @pytest.fixture(autouse=True, scope="session")
diff --git a/tests/test_utils/api_connexion_utils.py 
b/tests/test_utils/api_connexion_utils.py
index 836ad95d7b..83841acbcc 100644
--- a/tests/test_utils/api_connexion_utils.py
+++ b/tests/test_utils/api_connexion_utils.py
@@ -19,7 +19,7 @@ from __future__ import annotations
 from contextlib import contextmanager
 
 from airflow.api_connexion.exceptions import EXCEPTIONS_LINK_MAP
-from airflow.www.security_manager import EXISTING_ROLES
+from airflow.auth.managers.fab.security_manager.constants import EXISTING_ROLES
 
 
 @contextmanager
diff --git a/tests/test_utils/decorators.py b/tests/test_utils/decorators.py
index f6d4b91d91..5b028c694a 100644
--- a/tests/test_utils/decorators.py
+++ b/tests/test_utils/decorators.py
@@ -42,7 +42,6 @@ def dont_initialize_flask_app_submodules(_func=None, *, 
skip_all_except=None):
             "init_api_experimental",
             "init_api_auth_provider",
             "init_api_error_handlers",
-            "sync_appbuilder_roles",
             "init_jinja_globals",
             "init_xframe_protection",
             "init_airflow_session_interface",
diff --git a/tests/www/test_security.py b/tests/www/test_security.py
index 6f6e3ef35b..641b394808 100644
--- a/tests/www/test_security.py
+++ b/tests/www/test_security.py
@@ -892,7 +892,9 @@ def test_create_dag_specific_permissions(session, 
security_manager, monkeypatch,
     dagbag_class_mock.return_value = dagbag_mock
     import airflow.www.security
 
-    monkeypatch.setitem(airflow.www.security_manager.__dict__, "DagBag", 
dagbag_class_mock)
+    monkeypatch.setitem(
+        airflow.auth.managers.fab.security_manager.override.__dict__, 
"DagBag", dagbag_class_mock
+    )
     security_manager._sync_dag_view_permissions = mock.Mock()
 
     for dag in sample_dags:

Reply via email to