This is an automated email from the ASF dual-hosted git repository.

vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new aaa2aedb1c Use `is_authorized_*` APIs in views (#35000)
aaa2aedb1c is described below

commit aaa2aedb1c77a1963fa002b4b6cbc882bf648983
Author: Vincent <97131062+vincb...@users.noreply.github.com>
AuthorDate: Thu Nov 2 12:18:45 2023 -0400

    Use `is_authorized_*` APIs in views (#35000)
---
 airflow/api_connexion/endpoints/plugin_endpoint.py |   3 +-
 .../api_connexion/endpoints/provider_endpoint.py   |   3 +-
 airflow/api_connexion/security.py                  |   5 +-
 airflow/auth/managers/base_auth_manager.py         |  27 ++-
 airflow/auth/managers/fab/fab_auth_manager.py      |  56 ++++---
 .../auth/managers/fab/security_manager/override.py |   8 +-
 airflow/auth/managers/models/base_user.py          |  15 --
 airflow/auth/managers/models/resource_details.py   |  14 ++
 .../{models/base_user.py => utils/__init__.py}     |  31 ----
 .../managers/utils/fab.py}                         |  40 +++--
 airflow/security/permissions.py                    |   2 -
 airflow/www/auth.py                                |  13 +-
 airflow/www/extensions/init_appbuilder_links.py    |  24 ++-
 airflow/www/security_manager.py                    | 186 +++++++++++++++++++--
 airflow/www/views.py                               |   9 +-
 docs/apache-airflow/empty_plugin/empty_plugin.py   |   5 +-
 tests/auth/managers/fab/test_fab_auth_manager.py   |  53 +++++-
 tests/auth/managers/test_base_auth_manager.py      |  12 +-
 tests/www/test_security.py                         |  36 ++--
 tests/www/views/test_views_dagrun.py               |  33 +++-
 tests/www/views/test_views_tasks.py                |   2 +
 21 files changed, 426 insertions(+), 151 deletions(-)

diff --git a/airflow/api_connexion/endpoints/plugin_endpoint.py 
b/airflow/api_connexion/endpoints/plugin_endpoint.py
index 500bd65749..5a100fd6d5 100644
--- a/airflow/api_connexion/endpoints/plugin_endpoint.py
+++ b/airflow/api_connexion/endpoints/plugin_endpoint.py
@@ -21,13 +21,14 @@ from typing import TYPE_CHECKING
 from airflow.api_connexion import security
 from airflow.api_connexion.parameters import check_limit, format_parameters
 from airflow.api_connexion.schemas.plugin_schema import PluginCollection, 
plugin_collection_schema
+from airflow.auth.managers.models.resource_details import AccessView
 from airflow.plugins_manager import get_plugin_info
 
 if TYPE_CHECKING:
     from airflow.api_connexion.types import APIResponse
 
 
-@security.requires_access_website()
+@security.requires_access_view(AccessView.PLUGINS)
 @format_parameters({"limit": check_limit})
 def get_plugins(*, limit: int, offset: int = 0) -> APIResponse:
     """Get plugins endpoint."""
diff --git a/airflow/api_connexion/endpoints/provider_endpoint.py 
b/airflow/api_connexion/endpoints/provider_endpoint.py
index a64368dce3..d9ba0c819b 100644
--- a/airflow/api_connexion/endpoints/provider_endpoint.py
+++ b/airflow/api_connexion/endpoints/provider_endpoint.py
@@ -26,6 +26,7 @@ from airflow.api_connexion.schemas.provider_schema import (
     ProviderCollection,
     provider_collection_schema,
 )
+from airflow.auth.managers.models.resource_details import AccessView
 from airflow.providers_manager import ProvidersManager
 
 if TYPE_CHECKING:
@@ -45,7 +46,7 @@ def _provider_mapper(provider: ProviderInfo) -> Provider:
     )
 
 
-@security.requires_access_website()
+@security.requires_access_view(AccessView.PROVIDERS)
 def get_providers() -> APIResponse:
     """Get providers."""
     providers = [_provider_mapper(d) for d in 
ProvidersManager().providers.values()]
diff --git a/airflow/api_connexion/security.py 
b/airflow/api_connexion/security.py
index 59edb8ea35..956e7a2300 100644
--- a/airflow/api_connexion/security.py
+++ b/airflow/api_connexion/security.py
@@ -24,6 +24,7 @@ from flask import Response, g
 
 from airflow.api_connexion.exceptions import PermissionDenied, Unauthenticated
 from airflow.auth.managers.models.resource_details import (
+    AccessView,
     ConfigurationDetails,
     ConnectionDetails,
     DagAccessEntity,
@@ -229,12 +230,12 @@ def requires_access_variable(method: ResourceMethod) -> 
Callable[[T], T]:
     return requires_access_decorator
 
 
-def requires_access_website() -> Callable[[T], T]:
+def requires_access_view(access_view: AccessView) -> Callable[[T], T]:
     def requires_access_decorator(func: T):
         @wraps(func)
         def decorated(*args, **kwargs):
             return _requires_access(
-                is_authorized_callback=lambda: 
get_auth_manager().is_authorized_website(),
+                is_authorized_callback=lambda: 
get_auth_manager().is_authorized_view(access_view=access_view),
                 func=func,
                 args=args,
                 kwargs=kwargs,
diff --git a/airflow/auth/managers/base_auth_manager.py 
b/airflow/auth/managers/base_auth_manager.py
index 7d79fcce3e..3b25098ab8 100644
--- a/airflow/auth/managers/base_auth_manager.py
+++ b/airflow/auth/managers/base_auth_manager.py
@@ -26,6 +26,7 @@ from sqlalchemy import select
 from airflow.auth.managers.models.resource_details import (
     DagDetails,
 )
+from airflow.exceptions import AirflowException
 from airflow.models import DagModel
 from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.utils.session import NEW_SESSION, provide_session
@@ -37,6 +38,7 @@ if TYPE_CHECKING:
 
     from airflow.auth.managers.models.base_user import BaseUser
     from airflow.auth.managers.models.resource_details import (
+        AccessView,
         ConfigurationDetails,
         ConnectionDetails,
         DagAccessEntity,
@@ -216,18 +218,37 @@ class BaseAuthManager(LoggingMixin):
         """
 
     @abstractmethod
-    def is_authorized_website(
+    def is_authorized_view(
         self,
         *,
+        access_view: AccessView,
         user: BaseUser | None = None,
     ) -> bool:
         """
-        Return whether the user is authorized to access the read-only state of 
the installation.
+        Return whether the user is authorized to access a read-only state of 
the installation.
 
-        This includes the homepage, the list of installed plugins, the list of 
providers and list of triggers.
+        :param access_view: the specific read-only view/state the 
authorization request is about.
+        :param user: the user to perform the action on. If not provided (or 
None), it uses the current user
+        """
+
+    def is_authorized_custom_view(
+        self, *, fab_action_name: str, fab_resource_name: str, user: BaseUser 
| None = None
+    ):
+        """
+        Return whether the user is authorized to perform a given action on a 
custom view.
+
+        A custom view is a view defined as part of the auth manager. This view 
is then only available when
+        the auth manager is used as part of the environment.
+
+        By default, it throws an exception because auth managers do not define 
custom views by default.
+        If an auth manager defines some custom views, it needs to override 
this method.
 
+        :param fab_action_name: the name of the FAB action defined in the view 
in ``base_permissions``
+        :param fab_resource_name: the name of the FAB resource defined in the 
view in
+            ``class_permission_name``
         :param user: the user to perform the action on. If not provided (or 
None), it uses the current user
         """
+        raise AirflowException(f"The resource `{fab_resource_name}` does not 
exist in the environment.")
 
     @provide_session
     def get_permitted_dag_ids(
diff --git a/airflow/auth/managers/fab/fab_auth_manager.py 
b/airflow/auth/managers/fab/fab_auth_manager.py
index 1b65e3b76d..beb11da06c 100644
--- a/airflow/auth/managers/fab/fab_auth_manager.py
+++ b/airflow/auth/managers/fab/fab_auth_manager.py
@@ -35,6 +35,7 @@ from airflow.auth.managers.fab.cli_commands.definition import 
(
 )
 from airflow.auth.managers.fab.models import Permission, Role, User
 from airflow.auth.managers.models.resource_details import (
+    AccessView,
     ConfigurationDetails,
     ConnectionDetails,
     DagAccessEntity,
@@ -43,6 +44,7 @@ from airflow.auth.managers.models.resource_details import (
     PoolDetails,
     VariableDetails,
 )
+from airflow.auth.managers.utils.fab import get_fab_action_from_method_map, 
get_method_from_fab_action_map
 from airflow.cli.cli_config import (
     GroupCommand,
 )
@@ -52,9 +54,6 @@ from airflow.models import DagModel
 from airflow.security import permissions
 from airflow.security.permissions import (
     ACTION_CAN_ACCESS_MENU,
-    ACTION_CAN_CREATE,
-    ACTION_CAN_DELETE,
-    ACTION_CAN_EDIT,
     ACTION_CAN_READ,
     RESOURCE_AUDIT_LOG,
     RESOURCE_CLUSTER_ACTIVITY,
@@ -67,12 +66,16 @@ from airflow.security.permissions import (
     RESOURCE_DAG_RUN,
     RESOURCE_DAG_WARNING,
     RESOURCE_DATASET,
+    RESOURCE_DOCS,
     RESOURCE_IMPORT_ERROR,
+    RESOURCE_JOB,
     RESOURCE_PLUGIN,
     RESOURCE_POOL,
     RESOURCE_PROVIDER,
+    RESOURCE_SLA_MISS,
     RESOURCE_TASK_INSTANCE,
     RESOURCE_TASK_LOG,
+    RESOURCE_TASK_RESCHEDULE,
     RESOURCE_TRIGGER,
     RESOURCE_VARIABLE,
     RESOURCE_WEBSITE,
@@ -89,13 +92,6 @@ if TYPE_CHECKING:
         CLICommand,
     )
 
-MAP_METHOD_NAME_TO_FAB_ACTION_NAME: dict[ResourceMethod, str] = {
-    "POST": ACTION_CAN_CREATE,
-    "GET": ACTION_CAN_READ,
-    "PUT": ACTION_CAN_EDIT,
-    "DELETE": ACTION_CAN_DELETE,
-}
-
 _MAP_DAG_ACCESS_ENTITY_TO_FAB_RESOURCE_TYPE: dict[DagAccessEntity, tuple[str, 
...]] = {
     DagAccessEntity.AUDIT_LOG: (RESOURCE_AUDIT_LOG,),
     DagAccessEntity.CODE: (RESOURCE_DAG_CODE,),
@@ -110,10 +106,22 @@ _MAP_DAG_ACCESS_ENTITY_TO_FAB_RESOURCE_TYPE: 
dict[DagAccessEntity, tuple[str, ..
     DagAccessEntity.TASK: (RESOURCE_TASK_INSTANCE,),
     DagAccessEntity.TASK_INSTANCE: (RESOURCE_DAG_RUN, RESOURCE_TASK_INSTANCE),
     DagAccessEntity.TASK_LOGS: (RESOURCE_TASK_LOG,),
+    DagAccessEntity.TASK_RESCHEDULE: (RESOURCE_TASK_RESCHEDULE,),
     DagAccessEntity.WARNING: (RESOURCE_DAG_WARNING,),
     DagAccessEntity.XCOM: (RESOURCE_XCOM,),
 }
 
+_MAP_ACCESS_VIEW_TO_FAB_RESOURCE_TYPE = {
+    AccessView.CLUSTER_ACTIVITY: RESOURCE_CLUSTER_ACTIVITY,
+    AccessView.DOCS: RESOURCE_DOCS,
+    AccessView.JOBS: RESOURCE_JOB,
+    AccessView.PLUGINS: RESOURCE_PLUGIN,
+    AccessView.PROVIDERS: RESOURCE_PROVIDER,
+    AccessView.TRIGGERS: RESOURCE_TRIGGER,
+    AccessView.SLA: RESOURCE_SLA_MISS,
+    AccessView.WEBSITE: RESOURCE_WEBSITE,
+}
+
 
 class FabAuthManager(BaseAuthManager):
     """
@@ -271,14 +279,18 @@ class FabAuthManager(BaseAuthManager):
     ) -> bool:
         return self._is_authorized(method=method, 
resource_type=RESOURCE_VARIABLE, user=user)
 
-    def is_authorized_website(self, *, user: BaseUser | None = None) -> bool:
-        return (
-            self._is_authorized(method="GET", resource_type=RESOURCE_PLUGIN, 
user=user)
-            or self._is_authorized(method="GET", 
resource_type=RESOURCE_PROVIDER, user=user)
-            or self._is_authorized(method="GET", 
resource_type=RESOURCE_TRIGGER, user=user)
-            or self._is_authorized(method="GET", 
resource_type=RESOURCE_WEBSITE, user=user)
+    def is_authorized_view(self, *, access_view: AccessView, user: BaseUser | 
None = None) -> bool:
+        return self._is_authorized(
+            method="GET", 
resource_type=_MAP_ACCESS_VIEW_TO_FAB_RESOURCE_TYPE[access_view], user=user
         )
 
+    def is_authorized_custom_view(
+        self, *, fab_action_name: str, fab_resource_name: str, user: BaseUser 
| None = None
+    ):
+        if not user:
+            user = self.get_user()
+        return (fab_action_name, fab_resource_name) in 
self._get_user_permissions(user)
+
     @provide_session
     def get_permitted_dag_ids(
         self,
@@ -312,8 +324,7 @@ class FabAuthManager(BaseAuthManager):
             )
             roles = user_query.roles
 
-        map_fab_action_name_to_method_name = {v: k for k, v in 
MAP_METHOD_NAME_TO_FAB_ACTION_NAME.items()}
-        map_fab_action_name_to_method_name[ACTION_CAN_ACCESS_MENU] = "GET"
+        map_fab_action_name_to_method_name = get_method_from_fab_action_map()
         resources = set()
         for role in roles:
             for permission in role.permissions:
@@ -437,9 +448,10 @@ class FabAuthManager(BaseAuthManager):
 
         :meta private:
         """
-        if method not in MAP_METHOD_NAME_TO_FAB_ACTION_NAME:
+        fab_action_from_method_map = get_fab_action_from_method_map()
+        if method not in fab_action_from_method_map:
             raise AirflowException(f"Unknown method: {method}")
-        return MAP_METHOD_NAME_TO_FAB_ACTION_NAME[method]
+        return fab_action_from_method_map[method]
 
     @staticmethod
     def _get_fab_resource_types(dag_access_entity: DagAccessEntity) -> 
tuple[str, ...]:
@@ -482,9 +494,9 @@ class FabAuthManager(BaseAuthManager):
 
         :meta private:
         """
+        perms = getattr(user, "perms") or []
         return [
-            (ACTION_CAN_READ if perm[0] == ACTION_CAN_ACCESS_MENU else 
perm[0], perm[1])
-            for perm in user.perms
+            (ACTION_CAN_READ if perm[0] == ACTION_CAN_ACCESS_MENU else 
perm[0], perm[1]) for perm in perms
         ]
 
     def _get_root_dag_id(self, dag_id: str) -> str:
diff --git a/airflow/auth/managers/fab/security_manager/override.py 
b/airflow/auth/managers/fab/security_manager/override.py
index 9bc34912f2..257c284920 100644
--- a/airflow/auth/managers/fab/security_manager/override.py
+++ b/airflow/auth/managers/fab/security_manager/override.py
@@ -55,7 +55,6 @@ from sqlalchemy.exc import MultipleResultsFound
 from sqlalchemy.orm import Session, joinedload
 from werkzeug.security import check_password_hash, generate_password_hash
 
-from airflow.auth.managers.fab.fab_auth_manager import 
MAP_METHOD_NAME_TO_FAB_ACTION_NAME
 from airflow.auth.managers.fab.models import (
     Action,
     Permission,
@@ -66,6 +65,7 @@ from airflow.auth.managers.fab.models import (
 )
 from airflow.auth.managers.fab.models.anonymous_user import AnonymousUser
 from airflow.auth.managers.fab.security_manager.constants import EXISTING_ROLES
+from airflow.auth.managers.utils.fab import get_method_from_fab_action_map
 from airflow.configuration import conf
 from airflow.exceptions import AirflowException, RemovedInAirflow3Warning
 from airflow.models import DagBag, DagModel
@@ -735,10 +735,10 @@ class 
FabAirflowSecurityManagerOverride(AirflowSecurityManagerV2):
         )
         if not user_actions:
             user_actions = [permissions.ACTION_CAN_EDIT, 
permissions.ACTION_CAN_READ]
-        fab_action_name_to_method_name = {v: k for k, v in 
MAP_METHOD_NAME_TO_FAB_ACTION_NAME.items()}
+        method_from_fab_action_map = get_method_from_fab_action_map()
         user_methods: Container[ResourceMethod] = [
-            fab_action_name_to_method_name[action]
-            for action in fab_action_name_to_method_name
+            method_from_fab_action_map[action]
+            for action in method_from_fab_action_map
             if action in user_actions
         ]
         return get_auth_manager().get_permitted_dag_ids(user=user, 
methods=user_methods, session=session)
diff --git a/airflow/auth/managers/models/base_user.py 
b/airflow/auth/managers/models/base_user.py
index 7bc17cc240..7cb45d8389 100644
--- a/airflow/auth/managers/models/base_user.py
+++ b/airflow/auth/managers/models/base_user.py
@@ -18,31 +18,16 @@
 from __future__ import annotations
 
 from abc import abstractmethod
-from typing import Any
 
 
 class BaseUser:
     """User model interface."""
 
-    @property
-    def is_authenticated(self) -> bool:
-        return not self.is_anonymous
-
     @property
     @abstractmethod
     def is_active(self) -> bool:
         ...
 
-    @property
-    @abstractmethod
-    def is_anonymous(self) -> bool:
-        ...
-
-    @property
-    @abstractmethod
-    def perms(self) -> Any:
-        ...
-
     @abstractmethod
     def get_id(self) -> str:
         ...
diff --git a/airflow/auth/managers/models/resource_details.py 
b/airflow/auth/managers/models/resource_details.py
index 1f98ba72ce..a64ef68978 100644
--- a/airflow/auth/managers/models/resource_details.py
+++ b/airflow/auth/managers/models/resource_details.py
@@ -63,6 +63,19 @@ class VariableDetails:
     key: str | None = None
 
 
+class AccessView(Enum):
+    """Enum of specific views the user tries to access."""
+
+    CLUSTER_ACTIVITY = "CLUSTER_ACTIVITY"
+    DOCS = "DOCS"
+    JOBS = "JOBS"
+    PLUGINS = "PLUGINS"
+    PROVIDERS = "PROVIDERS"
+    TRIGGERS = "TRIGGERS"
+    SLA = "SLA"
+    WEBSITE = "WEBSITE"
+
+
 class DagAccessEntity(Enum):
     """Enum of DAG entities the user tries to access."""
 
@@ -73,6 +86,7 @@ class DagAccessEntity(Enum):
     RUN = "RUN"
     TASK = "TASK"
     TASK_INSTANCE = "TASK_INSTANCE"
+    TASK_RESCHEDULE = "TASK_RESCHEDULE"
     TASK_LOGS = "TASK_LOGS"
     WARNING = "WARNING"
     XCOM = "XCOM"
diff --git a/airflow/auth/managers/models/base_user.py 
b/airflow/auth/managers/utils/__init__.py
similarity index 59%
copy from airflow/auth/managers/models/base_user.py
copy to airflow/auth/managers/utils/__init__.py
index 7bc17cc240..217e5db960 100644
--- a/airflow/auth/managers/models/base_user.py
+++ b/airflow/auth/managers/utils/__init__.py
@@ -15,34 +15,3 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-from __future__ import annotations
-
-from abc import abstractmethod
-from typing import Any
-
-
-class BaseUser:
-    """User model interface."""
-
-    @property
-    def is_authenticated(self) -> bool:
-        return not self.is_anonymous
-
-    @property
-    @abstractmethod
-    def is_active(self) -> bool:
-        ...
-
-    @property
-    @abstractmethod
-    def is_anonymous(self) -> bool:
-        ...
-
-    @property
-    @abstractmethod
-    def perms(self) -> Any:
-        ...
-
-    @abstractmethod
-    def get_id(self) -> str:
-        ...
diff --git a/airflow/api_connexion/endpoints/plugin_endpoint.py 
b/airflow/auth/managers/utils/fab.py
similarity index 50%
copy from airflow/api_connexion/endpoints/plugin_endpoint.py
copy to airflow/auth/managers/utils/fab.py
index 500bd65749..316e5ecff1 100644
--- a/airflow/api_connexion/endpoints/plugin_endpoint.py
+++ b/airflow/auth/managers/utils/fab.py
@@ -1,3 +1,4 @@
+#
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -18,19 +19,34 @@ from __future__ import annotations
 
 from typing import TYPE_CHECKING
 
-from airflow.api_connexion import security
-from airflow.api_connexion.parameters import check_limit, format_parameters
-from airflow.api_connexion.schemas.plugin_schema import PluginCollection, 
plugin_collection_schema
-from airflow.plugins_manager import get_plugin_info
+from airflow.security.permissions import (
+    ACTION_CAN_ACCESS_MENU,
+    ACTION_CAN_CREATE,
+    ACTION_CAN_DELETE,
+    ACTION_CAN_EDIT,
+    ACTION_CAN_READ,
+)
 
 if TYPE_CHECKING:
-    from airflow.api_connexion.types import APIResponse
+    from airflow.auth.managers.base_auth_manager import ResourceMethod
+
+# Convert methods to FAB action name
+_MAP_METHOD_NAME_TO_FAB_ACTION_NAME: dict[ResourceMethod, str] = {
+    "POST": ACTION_CAN_CREATE,
+    "GET": ACTION_CAN_READ,
+    "PUT": ACTION_CAN_EDIT,
+    "DELETE": ACTION_CAN_DELETE,
+}
+
+
+def get_fab_action_from_method_map():
+    """Returns the map associating a method to a FAB action."""
+    return _MAP_METHOD_NAME_TO_FAB_ACTION_NAME
 
 
-@security.requires_access_website()
-@format_parameters({"limit": check_limit})
-def get_plugins(*, limit: int, offset: int = 0) -> APIResponse:
-    """Get plugins endpoint."""
-    plugins_info = get_plugin_info()
-    collection = PluginCollection(plugins=plugins_info[offset:][:limit], 
total_entries=len(plugins_info))
-    return plugin_collection_schema.dump(collection)
+def get_method_from_fab_action_map():
+    """Returns the map associating a FAB action to a method."""
+    return {
+        **{v: k for k, v in _MAP_METHOD_NAME_TO_FAB_ACTION_NAME.items()},
+        ACTION_CAN_ACCESS_MENU: "GET",
+    }
diff --git a/airflow/security/permissions.py b/airflow/security/permissions.py
index a5c862c170..780c2463b0 100644
--- a/airflow/security/permissions.py
+++ b/airflow/security/permissions.py
@@ -19,7 +19,6 @@ from __future__ import annotations
 # Resource Constants
 RESOURCE_ACTION = "Permissions"
 RESOURCE_ADMIN_MENU = "Admin"
-RESOURCE_AIRFLOW = "Airflow"
 RESOURCE_AUDIT_LOG = "Audit Logs"
 RESOURCE_BROWSE_MENU = "Browse"
 RESOURCE_CONFIG = "Configurations"
@@ -36,7 +35,6 @@ RESOURCE_DOCS = "Documentation"
 RESOURCE_DOCS_MENU = "Docs"
 RESOURCE_IMPORT_ERROR = "ImportError"
 RESOURCE_JOB = "Jobs"
-RESOURCE_LOGIN = "Logins"
 RESOURCE_MY_PASSWORD = "My Password"
 RESOURCE_MY_PROFILE = "My Profile"
 RESOURCE_PASSWORD = "Passwords"
diff --git a/airflow/www/auth.py b/airflow/www/auth.py
index 8fb6ffb435..8e9231fdec 100644
--- a/airflow/www/auth.py
+++ b/airflow/www/auth.py
@@ -21,9 +21,10 @@ import warnings
 from functools import wraps
 from typing import TYPE_CHECKING, Callable, Sequence, TypeVar, cast
 
-from flask import flash, g, redirect, render_template, request
+from flask import flash, redirect, render_template, request
 
 from airflow.auth.managers.models.resource_details import (
+    AccessView,
     ConnectionDetails,
     DagAccessEntity,
     DagDetails,
@@ -107,7 +108,9 @@ def _has_access(*, is_authorized: bool, func: Callable, 
args, kwargs):
     """
     if is_authorized:
         return func(*args, **kwargs)
-    elif get_auth_manager().is_logged_in() and not g.user.perms:
+    elif get_auth_manager().is_logged_in() and not 
get_auth_manager().is_authorized_view(
+        access_view=AccessView.WEBSITE
+    ):
         return (
             render_template(
                 "airflow/no_roles_permissions.html",
@@ -215,6 +218,6 @@ def has_access_variable(method: ResourceMethod) -> 
Callable[[T], T]:
     return _has_access_no_details(lambda: 
get_auth_manager().is_authorized_variable(method=method))
 
 
-def has_access_website() -> Callable[[T], T]:
-    """Check current user's permissions to access the website."""
-    return _has_access_no_details(lambda: 
get_auth_manager().is_authorized_website())
+def has_access_view(access_view: AccessView = AccessView.WEBSITE) -> 
Callable[[T], T]:
+    """Decorator that checks current user's permissions to access the 
website."""
+    return _has_access_no_details(lambda: 
get_auth_manager().is_authorized_view(access_view=access_view))
diff --git a/airflow/www/extensions/init_appbuilder_links.py 
b/airflow/www/extensions/init_appbuilder_links.py
index 6ae049edcd..0d2f4e13e9 100644
--- a/airflow/www/extensions/init_appbuilder_links.py
+++ b/airflow/www/extensions/init_appbuilder_links.py
@@ -17,6 +17,7 @@
 from __future__ import annotations
 
 from airflow.configuration import conf
+from airflow.security.permissions import RESOURCE_DOCS, RESOURCE_DOCS_MENU
 from airflow.utils.docs import get_docs_url
 
 
@@ -32,21 +33,32 @@ def init_appbuilder_links(app):
     appbuilder.menu.menu.insert(2, appbuilder.menu.menu.pop())  # Place in the 
third menu slot
 
     # Docs links
-    appbuilder.add_link(name="Documentation", label="Documentation", 
href=get_docs_url(), category="Docs")
     appbuilder.add_link(
-        name="Documentation", label="Airflow Website", 
href="https://airflow.apache.org";, category="Docs"
+        name=RESOURCE_DOCS, label="Documentation", href=get_docs_url(), 
category=RESOURCE_DOCS_MENU
     )
     appbuilder.add_link(
-        name="Documentation", label="GitHub Repo", 
href="https://github.com/apache/airflow";, category="Docs"
+        name=RESOURCE_DOCS,
+        label="Airflow Website",
+        href="https://airflow.apache.org";,
+        category=RESOURCE_DOCS_MENU,
+    )
+    appbuilder.add_link(
+        name=RESOURCE_DOCS,
+        label="GitHub Repo",
+        href="https://github.com/apache/airflow";,
+        category=RESOURCE_DOCS_MENU,
     )
 
     if conf.getboolean("webserver", "enable_swagger_ui", fallback=True):
         appbuilder.add_link(
-            name="Documentation",
+            name=RESOURCE_DOCS,
             label="REST API Reference (Swagger UI)",
             href="/api/v1./api/v1_swagger_ui_index",
-            category="Docs",
+            category=RESOURCE_DOCS_MENU,
         )
     appbuilder.add_link(
-        name="Documentation", label="REST API Reference (Redoc)", 
href="RedocView.redoc", category="Docs"
+        name=RESOURCE_DOCS,
+        label="REST API Reference (Redoc)",
+        href="RedocView.redoc",
+        category=RESOURCE_DOCS_MENU,
     )
diff --git a/airflow/www/security_manager.py b/airflow/www/security_manager.py
index 1d6bc592a5..de183b1d7f 100644
--- a/airflow/www/security_manager.py
+++ b/airflow/www/security_manager.py
@@ -17,7 +17,8 @@
 from __future__ import annotations
 
 import warnings
-from typing import TYPE_CHECKING, Any, Collection, Sequence
+from functools import cached_property
+from typing import TYPE_CHECKING, Any, Callable, Collection, Sequence
 
 from flask import g
 from sqlalchemy import select
@@ -42,10 +43,42 @@ from airflow.auth.managers.fab.views.user_edit import (
     CustomUserInfoEditView,
 )
 from airflow.auth.managers.fab.views.user_stats import CustomUserStatsChartView
+from airflow.auth.managers.models.resource_details import AccessView, 
DagAccessEntity
+from airflow.auth.managers.utils.fab import (
+    get_method_from_fab_action_map,
+)
 from airflow.exceptions import AirflowException, RemovedInAirflow3Warning
 from airflow.models import DagModel
 from airflow.security import permissions
+from airflow.security.permissions import (
+    ACTION_CAN_ACCESS_MENU,
+    ACTION_CAN_READ,
+    RESOURCE_ADMIN_MENU,
+    RESOURCE_AUDIT_LOG,
+    RESOURCE_BROWSE_MENU,
+    RESOURCE_CLUSTER_ACTIVITY,
+    RESOURCE_CONFIG,
+    RESOURCE_CONNECTION,
+    RESOURCE_DAG,
+    RESOURCE_DAG_CODE,
+    RESOURCE_DAG_DEPENDENCIES,
+    RESOURCE_DAG_RUN,
+    RESOURCE_DATASET,
+    RESOURCE_DOCS,
+    RESOURCE_DOCS_MENU,
+    RESOURCE_JOB,
+    RESOURCE_PLUGIN,
+    RESOURCE_POOL,
+    RESOURCE_PROVIDER,
+    RESOURCE_SLA_MISS,
+    RESOURCE_TASK_INSTANCE,
+    RESOURCE_TASK_RESCHEDULE,
+    RESOURCE_TRIGGER,
+    RESOURCE_VARIABLE,
+    RESOURCE_XCOM,
+)
 from airflow.utils.log.logging_mixin import LoggingMixin
+from airflow.www.extensions.init_auth_manager import get_auth_manager
 from airflow.www.fab_security.sqla.manager import SecurityManager
 from airflow.www.utils import CustomSQLAInterface
 
@@ -53,8 +86,7 @@ EXISTING_ROLES = FAB_EXISTING_ROLES
 
 if TYPE_CHECKING:
     from airflow.auth.managers.fab.models import Permission, Resource
-
-    pass
+    from airflow.auth.managers.models.base_user import BaseUser
 
 
 class AirflowSecurityManagerV2(SecurityManager, LoggingMixin):
@@ -250,23 +282,36 @@ class AirflowSecurityManagerV2(SecurityManager, 
LoggingMixin):
 
         Example actions might include can_read, can_write, can_delete, etc.
 
+        This function is called by FAB when accessing a view. See
+        
https://github.com/dpgaspar/Flask-AppBuilder/blob/c6fecdc551629e15467fde5d06b4437379d90592/flask_appbuilder/security/decorators.py#L134
+
+        The resource ID (e.g. the connection ID) is not passed to this 
function (see above link). Therefore,
+        it is not possible to perform fine-grained access authorization with 
the resource ID yet. In other
+        words, we can only verify the user has access to all connections and 
not to a specific connection.
+        To make it happen, we either need to:
+         - Override all views in 'airflow/www/views.py' inheriting from 
`AirflowModelView` and use a custom
+         `has_access` decorator.
+         - Wait for the new Airflow UI to come.
+
         :param action_name: action_name on resource (e.g can_read, can_edit).
         :param resource_name: name of view-menu or resource.
-        :param user: user name
+        :param user: user
         :return: Whether user could perform certain action on the resource.
         :rtype bool
         """
         if not user:
             user = g.user
-        if (action_name, resource_name) in user.perms:
-            return True
 
-        if self.is_dag_resource(resource_name):
-            if (action_name, permissions.RESOURCE_DAG) in user.perms:
-                return True
-            return (action_name, resource_name) in user.perms
-
-        return False
+        is_authorized_method = 
self._get_auth_manager_is_authorized_method(resource_name)
+        if is_authorized_method:
+            return is_authorized_method(action_name, user)
+        else:
+            # This means the page the user is trying to access is specific to 
the auth manager used
+            # Example: the user list view in FabAuthManager
+            action_name = ACTION_CAN_READ if action_name == 
ACTION_CAN_ACCESS_MENU else action_name
+            return get_auth_manager().is_authorized_custom_view(
+                fab_action_name=action_name, fab_resource_name=resource_name, 
user=user
+            )
 
     def create_admin_standalone(self) -> tuple[str | None, str | None]:
         """Perform the required steps when initializing airflow for standalone 
mode.
@@ -371,3 +416,120 @@ class AirflowSecurityManagerV2(SecurityManager, 
LoggingMixin):
         raise NotImplementedError(
             "The method 'check_authorization' is only available with the auth 
manager FabAuthManager"
         )
+
+    @cached_property
+    def _auth_manager_is_authorized_map(self) -> dict[str, Callable[[str, 
BaseUser | None], bool]]:
+        """
+        Return the map associating a FAB resource name to the corresponding 
auth manager is_authorized_ API.
+
+        The function returned takes the FAB action name and the user as 
parameter.
+        """
+        auth_manager = get_auth_manager()
+        methods = get_method_from_fab_action_map()
+
+        return {
+            RESOURCE_AUDIT_LOG: lambda action, user: 
auth_manager.is_authorized_dag(
+                method=methods[action],
+                access_entity=DagAccessEntity.AUDIT_LOG,
+                user=user,
+            ),
+            RESOURCE_CLUSTER_ACTIVITY: lambda action, user: 
auth_manager.is_authorized_view(
+                access_view=AccessView.CLUSTER_ACTIVITY,
+                user=user,
+            ),
+            RESOURCE_CONFIG: lambda action, user: 
auth_manager.is_authorized_configuration(
+                method=methods[action],
+                user=user,
+            ),
+            RESOURCE_CONNECTION: lambda action, user: 
auth_manager.is_authorized_connection(
+                method=methods[action],
+                user=user,
+            ),
+            RESOURCE_DAG: lambda action, user: auth_manager.is_authorized_dag(
+                method=methods[action],
+                user=user,
+            ),
+            RESOURCE_DAG_CODE: lambda action, user: 
auth_manager.is_authorized_dag(
+                method=methods[action],
+                access_entity=DagAccessEntity.CODE,
+                user=user,
+            ),
+            RESOURCE_DAG_DEPENDENCIES: lambda action, user: 
auth_manager.is_authorized_dag(
+                method=methods[action],
+                access_entity=DagAccessEntity.DEPENDENCIES,
+                user=user,
+            ),
+            RESOURCE_DAG_RUN: lambda action, user: 
auth_manager.is_authorized_dag(
+                method=methods[action],
+                access_entity=DagAccessEntity.RUN,
+                user=user,
+            ),
+            RESOURCE_DATASET: lambda action, user: 
auth_manager.is_authorized_dataset(
+                method=methods[action],
+                user=user,
+            ),
+            RESOURCE_DOCS: lambda action, user: 
auth_manager.is_authorized_view(
+                access_view=AccessView.DOCS,
+                user=user,
+            ),
+            RESOURCE_PLUGIN: lambda action, user: 
auth_manager.is_authorized_view(
+                access_view=AccessView.PLUGINS,
+                user=user,
+            ),
+            RESOURCE_JOB: lambda action, user: auth_manager.is_authorized_view(
+                access_view=AccessView.JOBS,
+                user=user,
+            ),
+            RESOURCE_POOL: lambda action, user: 
auth_manager.is_authorized_pool(
+                method=methods[action],
+                user=user,
+            ),
+            RESOURCE_PROVIDER: lambda action, user: 
auth_manager.is_authorized_view(
+                access_view=AccessView.PROVIDERS,
+                user=user,
+            ),
+            RESOURCE_SLA_MISS: lambda action, user: 
auth_manager.is_authorized_view(
+                access_view=AccessView.SLA,
+                user=user,
+            ),
+            RESOURCE_TASK_INSTANCE: lambda action, user: 
auth_manager.is_authorized_dag(
+                method=methods[action],
+                access_entity=DagAccessEntity.TASK_INSTANCE,
+                user=user,
+            ),
+            RESOURCE_TASK_RESCHEDULE: lambda action, user: 
auth_manager.is_authorized_dag(
+                method=methods[action],
+                access_entity=DagAccessEntity.TASK_RESCHEDULE,
+                user=user,
+            ),
+            RESOURCE_TRIGGER: lambda action, user: 
auth_manager.is_authorized_view(
+                access_view=AccessView.TRIGGERS,
+                user=user,
+            ),
+            RESOURCE_VARIABLE: lambda action, user: 
auth_manager.is_authorized_variable(
+                method=methods[action],
+                user=user,
+            ),
+            RESOURCE_XCOM: lambda action, user: auth_manager.is_authorized_dag(
+                method=methods[action],
+                access_entity=DagAccessEntity.XCOM,
+                user=user,
+            ),
+        }
+
+    def _get_auth_manager_is_authorized_method(self, fab_resource_name: str) 
-> Callable | None:
+        is_authorized_method = 
self._auth_manager_is_authorized_map.get(fab_resource_name)
+        if is_authorized_method:
+            return is_authorized_method
+        elif fab_resource_name in [RESOURCE_DOCS_MENU, RESOURCE_ADMIN_MENU, 
RESOURCE_BROWSE_MENU]:
+            # Display the "Browse", "Admin" and "Docs" dropdowns in the menu 
if the user has access to at
+            # least one dropdown child
+            return self._is_authorized_category_menu(fab_resource_name)
+        else:
+            return None
+
+    def _is_authorized_category_menu(self, category: str) -> Callable:
+        items = {item.name for item in 
self.appbuilder.menu.find(category).childs}
+        return lambda action, user: any(
+            
self._get_auth_manager_is_authorized_method(fab_resource_name=item) for item in 
items
+        )
diff --git a/airflow/www/views.py b/airflow/www/views.py
index bf8fa85873..1c7a795a5a 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -85,7 +85,7 @@ from airflow.api.common.mark_tasks import (
     set_dag_run_state_to_success,
     set_state,
 )
-from airflow.auth.managers.models.resource_details import DagAccessEntity, 
DagDetails
+from airflow.auth.managers.models.resource_details import AccessView, 
DagAccessEntity, DagDetails
 from airflow.compat.functools import cache
 from airflow.configuration import AIRFLOW_CONFIG, conf
 from airflow.datasets import Dataset
@@ -731,7 +731,7 @@ class Airflow(AirflowBaseView):
         return flask.json.jsonify(airflow_health_status)
 
     @expose("/home")
-    @auth.has_access_website()
+    @auth.has_access_view()
     def index(self):
         """Home view."""
         from airflow.models.dag import DagOwnerAttributes
@@ -4527,7 +4527,7 @@ class PluginView(AirflowBaseView):
     plugins_attributes_to_dump = PLUGINS_ATTRIBUTES_TO_DUMP
 
     @expose("/plugin")
-    @auth.has_access_website()
+    @auth.has_access_view(AccessView.PLUGINS)
     def list(self):
         """List loaded plugins."""
         plugins_manager.ensure_plugins_loaded()
@@ -4575,7 +4575,7 @@ class ProviderView(AirflowBaseView):
     ]
 
     @expose("/provider")
-    @auth.has_access_website()
+    @auth.has_access_view(AccessView.PROVIDERS)
     def list(self):
         """List providers."""
         providers_manager = ProvidersManager()
@@ -4973,6 +4973,7 @@ class DagRunModelView(AirflowPrivilegeVerifierModelView):
 
     class_permission_name = permissions.RESOURCE_DAG_RUN
     method_permission_name = {
+        "add": "create",
         "delete": "delete",
         "edit": "edit",
         "list": "read",
diff --git a/docs/apache-airflow/empty_plugin/empty_plugin.py 
b/docs/apache-airflow/empty_plugin/empty_plugin.py
index 6686eb1b67..31a782d2a4 100644
--- a/docs/apache-airflow/empty_plugin/empty_plugin.py
+++ b/docs/apache-airflow/empty_plugin/empty_plugin.py
@@ -21,8 +21,9 @@ from __future__ import annotations
 from flask import Blueprint
 from flask_appbuilder import BaseView, expose
 
+from airflow.auth.managers.models.resource_details import AccessView
 from airflow.plugins_manager import AirflowPlugin
-from airflow.www.auth import has_access_website
+from airflow.www.auth import has_access_view
 
 
 class EmptyPluginView(BaseView):
@@ -31,7 +32,7 @@ class EmptyPluginView(BaseView):
     default_view = "index"
 
     @expose("/")
-    @has_access_website()
+    @has_access_view(AccessView.PLUGINS)
     def index(self):
         """Create default view"""
         return self.render_template("empty_plugin/index.html", name="Empty 
Plugin")
diff --git a/tests/auth/managers/fab/test_fab_auth_manager.py 
b/tests/auth/managers/fab/test_fab_auth_manager.py
index d9f58fded1..f7cb7070d7 100644
--- a/tests/auth/managers/fab/test_fab_auth_manager.py
+++ b/tests/auth/managers/fab/test_fab_auth_manager.py
@@ -26,7 +26,7 @@ from flask import Flask
 from airflow.auth.managers.fab.fab_auth_manager import FabAuthManager
 from airflow.auth.managers.fab.models import User
 from airflow.auth.managers.fab.security_manager.override import 
FabAirflowSecurityManagerOverride
-from airflow.auth.managers.models.resource_details import DagAccessEntity, 
DagDetails
+from airflow.auth.managers.models.resource_details import AccessView, 
DagAccessEntity, DagDetails
 from airflow.exceptions import AirflowException
 from airflow.security.permissions import (
     ACTION_CAN_ACCESS_MENU,
@@ -40,7 +40,12 @@ from airflow.security.permissions import (
     RESOURCE_DAG,
     RESOURCE_DAG_RUN,
     RESOURCE_DATASET,
+    RESOURCE_JOB,
+    RESOURCE_PLUGIN,
+    RESOURCE_PROVIDER,
+    RESOURCE_SLA_MISS,
     RESOURCE_TASK_INSTANCE,
+    RESOURCE_TRIGGER,
     RESOURCE_VARIABLE,
     RESOURCE_WEBSITE,
 )
@@ -331,24 +336,62 @@ class TestFabAuthManager:
         assert result == expected_result
 
     @pytest.mark.parametrize(
-        "user_permissions, expected_result",
+        "access_view, user_permissions, expected_result",
         [
-            # With permission
+            # With permission (jobs)
             (
+                AccessView.JOBS,
+                [(ACTION_CAN_READ, RESOURCE_JOB)],
+                True,
+            ),
+            # With permission (plugins)
+            (
+                AccessView.PLUGINS,
+                [(ACTION_CAN_READ, RESOURCE_PLUGIN)],
+                True,
+            ),
+            # With permission (providers)
+            (
+                AccessView.PROVIDERS,
+                [(ACTION_CAN_READ, RESOURCE_PROVIDER)],
+                True,
+            ),
+            # With permission (triggers)
+            (
+                AccessView.TRIGGERS,
+                [(ACTION_CAN_READ, RESOURCE_TRIGGER)],
+                True,
+            ),
+            # With permission (SLA)
+            (
+                AccessView.SLA,
+                [(ACTION_CAN_READ, RESOURCE_SLA_MISS)],
+                True,
+            ),
+            # With permission (website)
+            (
+                AccessView.WEBSITE,
                 [(ACTION_CAN_READ, RESOURCE_WEBSITE)],
                 True,
             ),
             # Without permission
             (
+                AccessView.WEBSITE,
                 [(ACTION_CAN_READ, "resource_test"), (ACTION_CAN_CREATE, 
RESOURCE_WEBSITE)],
                 False,
             ),
+            # Without permission
+            (
+                AccessView.WEBSITE,
+                [(ACTION_CAN_READ, RESOURCE_TRIGGER)],
+                False,
+            ),
         ],
     )
-    def test_is_authorized_website(self, user_permissions, expected_result, 
auth_manager):
+    def test_is_authorized_view(self, access_view, user_permissions, 
expected_result, auth_manager):
         user = Mock()
         user.perms = user_permissions
-        result = auth_manager.is_authorized_website(user=user)
+        result = auth_manager.is_authorized_view(access_view=access_view, 
user=user)
         assert result == expected_result
 
     @pytest.mark.db_test
diff --git a/tests/auth/managers/test_base_auth_manager.py 
b/tests/auth/managers/test_base_auth_manager.py
index 35e23e631f..f009fb055b 100644
--- a/tests/auth/managers/test_base_auth_manager.py
+++ b/tests/auth/managers/test_base_auth_manager.py
@@ -23,12 +23,15 @@ import pytest
 from flask import Flask
 
 from airflow.auth.managers.base_auth_manager import BaseAuthManager, 
ResourceMethod
+from airflow.exceptions import AirflowException
+from airflow.security import permissions
 from airflow.www.extensions.init_appbuilder import init_appbuilder
 from airflow.www.security_manager import AirflowSecurityManagerV2
 
 if TYPE_CHECKING:
     from airflow.auth.managers.models.base_user import BaseUser
     from airflow.auth.managers.models.resource_details import (
+        AccessView,
         ConfigurationDetails,
         ConnectionDetails,
         DagAccessEntity,
@@ -98,7 +101,7 @@ class EmptyAuthManager(BaseAuthManager):
     ) -> bool:
         raise NotImplementedError()
 
-    def is_authorized_website(self, *, user: BaseUser | None = None) -> bool:
+    def is_authorized_view(self, *, access_view: AccessView, user: BaseUser | 
None = None) -> bool:
         raise NotImplementedError()
 
     def is_logged_in(self) -> bool:
@@ -133,6 +136,13 @@ class TestBaseAuthManager:
     def test_get_api_endpoints_return_none(self, auth_manager):
         assert auth_manager.get_api_endpoints() is None
 
+    def test_is_authorized_custom_view_throws_exception(self, auth_manager):
+        with pytest.raises(AirflowException, match="The resource `.*` does not 
exist in the environment."):
+            auth_manager.is_authorized_custom_view(
+                fab_action_name=permissions.ACTION_CAN_READ,
+                fab_resource_name=permissions.RESOURCE_MY_PASSWORD,
+            )
+
     @pytest.mark.db_test
     def test_security_manager_return_default_security_manager(self, 
auth_manager_with_appbuilder):
         assert isinstance(auth_manager_with_appbuilder.security_manager, 
AirflowSecurityManagerV2)
diff --git a/tests/www/test_security.py b/tests/www/test_security.py
index 641b394808..d7aaa0d29e 100644
--- a/tests/www/test_security.py
+++ b/tests/www/test_security.py
@@ -228,7 +228,9 @@ def sample_dags(security_manager):
 def has_dag_perm(security_manager):
     def _has_dag_perm(perm, dag_id, user):
         root_dag_id = security_manager._get_root_dag_id(dag_id)
-        return security_manager.has_access(perm, 
permissions.resource_name_for_dag(root_dag_id), user)
+        return get_auth_manager().is_authorized_dag(
+            method=perm, details=DagDetails(id=root_dag_id), user=user
+        )
 
     return _has_dag_perm
 
@@ -358,8 +360,8 @@ def 
test_verify_default_anon_user_has_no_access_to_specific_dag(app, session, se
 
             assert _can_read_dag(dag_id, user) is False
             assert _can_edit_dag(dag_id, user) is False
-            assert has_dag_perm(permissions.ACTION_CAN_READ, dag_id, user) is 
False
-            assert has_dag_perm(permissions.ACTION_CAN_EDIT, dag_id, user) is 
False
+            assert has_dag_perm("GET", dag_id, user) is False
+            assert has_dag_perm("PUT", dag_id, user) is False
 
 
 @patch.object(FabAuthManager, "is_logged_in")
@@ -403,8 +405,8 @@ def 
test_verify_anon_user_with_admin_role_has_access_to_each_dag(
 
                 assert _can_read_dag(dag_id, user) is True
                 assert _can_edit_dag(dag_id, user) is True
-                assert has_dag_perm(permissions.ACTION_CAN_READ, dag_id, user) 
is True
-                assert has_dag_perm(permissions.ACTION_CAN_EDIT, dag_id, user) 
is True
+                assert has_dag_perm("GET", dag_id, user) is True
+                assert has_dag_perm("PUT", dag_id, user) is True
 
 
 def test_get_user_roles(app_builder, security_manager):
@@ -800,7 +802,7 @@ def test_access_control_is_set_on_init(
                 access_control={role_name: [permissions.ACTION_CAN_EDIT, 
permissions.ACTION_CAN_READ]},
             )
             assert_user_has_dag_perms(
-                perms=[permissions.ACTION_CAN_EDIT, 
permissions.ACTION_CAN_READ],
+                perms=["PUT", "GET"],
                 dag_id="access_control_test",
                 user=user,
             )
@@ -808,7 +810,7 @@ def test_access_control_is_set_on_init(
             security_manager.bulk_sync_roles([{"role": negated_role, "perms": 
[]}])
             set_user_single_role(app, user, role_name=negated_role)
             assert_user_does_not_have_dag_perms(
-                perms=[permissions.ACTION_CAN_EDIT, 
permissions.ACTION_CAN_READ],
+                perms=["PUT", "GET"],
                 dag_id="access_control_test",
                 user=user,
             )
@@ -833,19 +835,15 @@ def test_access_control_stale_perms_are_revoked(
             security_manager._sync_dag_view_permissions(
                 "access_control_test", access_control={"team-a": READ_WRITE}
             )
-            assert_user_has_dag_perms(perms=READ_WRITE, 
dag_id="access_control_test", user=user)
+            assert_user_has_dag_perms(perms=["GET", "PUT"], 
dag_id="access_control_test", user=user)
 
             security_manager._sync_dag_view_permissions(
                 "access_control_test", access_control={"team-a": READ_ONLY}
             )
             # Clear the cache, to make it pick up new rol perms
             user._perms = None
-            assert_user_has_dag_perms(
-                perms=[permissions.ACTION_CAN_READ], 
dag_id="access_control_test", user=user
-            )
-            assert_user_does_not_have_dag_perms(
-                perms=[permissions.ACTION_CAN_EDIT], 
dag_id="access_control_test", user=user
-            )
+            assert_user_has_dag_perms(perms=["GET"], 
dag_id="access_control_test", user=user)
+            assert_user_does_not_have_dag_perms(perms=["PUT"], 
dag_id="access_control_test", user=user)
 
 
 def test_no_additional_dag_permission_views_created(db, security_manager):
@@ -1003,10 +1001,10 @@ def test_parent_dag_access_applies_to_subdag(app, 
security_manager, assert_user_
                     parent_dag_name, access_control={role_name: READ_WRITE}
                 )
 
-            assert_user_has_dag_perms(perms=READ_WRITE, 
dag_id=parent_dag_name, user=user)
-            assert_user_has_dag_perms(perms=READ_WRITE, dag_id=parent_dag_name 
+ ".subdag", user=user)
+            assert_user_has_dag_perms(perms=["GET", "PUT"], 
dag_id=parent_dag_name, user=user)
+            assert_user_has_dag_perms(perms=["GET", "PUT"], 
dag_id=parent_dag_name + ".subdag", user=user)
             assert_user_has_dag_perms(
-                perms=READ_WRITE, dag_id=parent_dag_name + 
".subdag.subsubdag", user=user
+                perms=["GET", "PUT"], dag_id=parent_dag_name + 
".subdag.subsubdag", user=user
             )
             session.query(DagModel).delete()
 
@@ -1040,8 +1038,8 @@ def test_permissions_work_for_dags_with_dot_in_dagname(
             security_manager.bulk_sync_roles(mock_roles)
             security_manager.sync_perm_for_dag(dag1.dag_id, 
access_control={role_name: READ_WRITE})
             security_manager.sync_perm_for_dag(dag2.dag_id, 
access_control={role_name: READ_WRITE})
-            assert_user_has_dag_perms(perms=READ_WRITE, dag_id=dag_id, 
user=user)
-            assert_user_does_not_have_dag_perms(perms=READ_WRITE, 
dag_id=dag_id_2, user=user)
+            assert_user_has_dag_perms(perms=["GET", "PUT"], dag_id=dag_id, 
user=user)
+            assert_user_does_not_have_dag_perms(perms=["GET", "PUT"], 
dag_id=dag_id_2, user=user)
             session.query(DagModel).delete()
 
 
diff --git a/tests/www/views/test_views_dagrun.py 
b/tests/www/views/test_views_dagrun.py
index 1ea2ab3f61..eccf73c6b5 100644
--- a/tests/www/views/test_views_dagrun.py
+++ b/tests/www/views/test_views_dagrun.py
@@ -20,7 +20,6 @@ from __future__ import annotations
 import flask
 import markupsafe
 import pytest
-import werkzeug
 
 from airflow.models import DagBag, DagRun, TaskInstance
 from airflow.security import permissions
@@ -60,6 +59,32 @@ def client_dr_without_dag_edit(app):
     delete_roles(app)
 
 
+@pytest.fixture(scope="module")
+def client_dr_without_dag_run_create(app):
+    create_user(
+        app,
+        username="all_dr_permissions_except_dag_run_create",
+        role_name="all_dr_permissions_except_dag_run_create",
+        permissions=[
+            (permissions.ACTION_CAN_READ, permissions.RESOURCE_WEBSITE),
+            (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
+            (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG_RUN),
+            (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAG_RUN),
+            (permissions.ACTION_CAN_DELETE, permissions.RESOURCE_DAG_RUN),
+            (permissions.ACTION_CAN_ACCESS_MENU, permissions.RESOURCE_DAG_RUN),
+        ],
+    )
+
+    yield client_with_login(
+        app,
+        username="all_dr_permissions_except_dag_run_create",
+        password="all_dr_permissions_except_dag_run_create",
+    )
+
+    delete_user(app, username="all_dr_permissions_except_dag_run_create")  # 
type: ignore
+    delete_roles(app)
+
+
 @pytest.fixture(scope="module", autouse=True)
 def init_blank_dagrun():
     """Make sure there are no runs before we test anything.
@@ -94,7 +119,7 @@ def 
test_get_dagrun_can_view_dags_without_edit_perms(session, running_dag_run, c
     check_content_in_response(dag_url_link, resp)
 
 
-def test_create_dagrun_permission_denied(session, client_dr_without_dag_edit):
+def test_create_dagrun_permission_denied(session, 
client_dr_without_dag_run_create):
     data = {
         "state": "running",
         "dag_id": "example_bash_operator",
@@ -103,8 +128,8 @@ def test_create_dagrun_permission_denied(session, 
client_dr_without_dag_edit):
         "conf": '{"include": "me"}',
     }
 
-    with pytest.raises(werkzeug.test.ClientRedirectError):
-        client_dr_without_dag_edit.post("/dagrun/add", data=data, 
follow_redirects=True)
+    resp = client_dr_without_dag_run_create.post("/dagrun/add", data=data, 
follow_redirects=True)
+    check_content_in_response("Access is Denied", resp)
 
 
 @pytest.fixture()
diff --git a/tests/www/views/test_views_tasks.py 
b/tests/www/views/test_views_tasks.py
index fbe99afeb3..c22ea5e2e7 100644
--- a/tests/www/views/test_views_tasks.py
+++ b/tests/www/views/test_views_tasks.py
@@ -130,12 +130,14 @@ def client_ti_without_dag_edit(app):
         username="all_ti_permissions_except_dag_edit",
         role_name="all_ti_permissions_except_dag_edit",
         permissions=[
+            (permissions.ACTION_CAN_READ, permissions.RESOURCE_WEBSITE),
             (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAG),
             (permissions.ACTION_CAN_CREATE, 
permissions.RESOURCE_TASK_INSTANCE),
             (permissions.ACTION_CAN_READ, permissions.RESOURCE_TASK_INSTANCE),
             (permissions.ACTION_CAN_EDIT, permissions.RESOURCE_TASK_INSTANCE),
             (permissions.ACTION_CAN_DELETE, 
permissions.RESOURCE_TASK_INSTANCE),
             (permissions.ACTION_CAN_ACCESS_MENU, 
permissions.RESOURCE_TASK_INSTANCE),
+            (permissions.ACTION_CAN_DELETE, permissions.RESOURCE_DAG_RUN),
         ],
     )
 

Reply via email to