This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new 7ab6210  Prepend `DAG:` to dag permissions (#11189)
7ab6210 is described below

commit 7ab62100af8a59694721e06b213a933869c6a1ed
Author: James Timmins <ja...@astronomer.io>
AuthorDate: Thu Oct 15 16:32:38 2020 -0700

    Prepend `DAG:` to dag permissions (#11189)
    
    This adds the prefix DAG: to newly created dag permissions. It supports 
checking permissions on both prefixed and un-prefixed DAG permission names.
    
    This will make it easier to identify permissions that related to granular 
dag access.
    
    This PR does not modify existing dag permission names to use the new 
prefixed naming scheme. That will come in a separate PR.
    
    Related to issue #10469
---
 UPDATING.md                                        |  10 +
 airflow/api_connexion/endpoints/dag_endpoint.py    |   9 +-
 .../api_connexion/endpoints/dag_run_endpoint.py    |  11 +-
 .../api_connexion/endpoints/extra_link_endpoint.py |   3 +-
 airflow/api_connexion/endpoints/log_endpoint.py    |   5 +-
 airflow/api_connexion/endpoints/task_endpoint.py   |   5 +-
 .../endpoints/task_instance_endpoint.py            |  17 +-
 airflow/api_connexion/endpoints/xcom_endpoint.py   |  15 +-
 airflow/api_connexion/security.py                  |   6 +-
 .../849da589634d_prefix_dag_permissions.py         | 115 +++++++++++
 airflow/models/dag.py                              |  33 +++-
 airflow/security/permissions.py                    |  28 +++
 airflow/www/app.py                                 |   1 +
 airflow/www/decorators.py                          |  50 +++--
 airflow/www/security.py                            | 166 ++++++++++------
 airflow/www/views.py                               |  13 +-
 tests/api_connexion/endpoints/test_dag_endpoint.py |   9 +-
 .../endpoints/test_dag_run_endpoint.py             |   3 +-
 .../endpoints/test_extra_link_endpoint.py          |   3 +-
 tests/api_connexion/endpoints/test_log_endpoint.py |   7 +-
 .../api_connexion/endpoints/test_task_endpoint.py  |   7 +-
 .../endpoints/test_task_instance_endpoint.py       |   3 +-
 .../api_connexion/endpoints/test_xcom_endpoint.py  |   3 +-
 tests/cli/commands/test_sync_perm_command.py       |   4 +-
 tests/models/test_dag.py                           |  13 ++
 tests/serialization/test_dag_serialization.py      |   6 +-
 tests/test_utils/fab_utils.py                      |  80 ++++++++
 tests/www/test_security.py                         | 220 +++++++++++----------
 tests/www/test_views.py                            |  57 +++---
 29 files changed, 643 insertions(+), 259 deletions(-)

diff --git a/UPDATING.md b/UPDATING.md
index 230a2a6..75a1d18 100644
--- a/UPDATING.md
+++ b/UPDATING.md
@@ -50,6 +50,16 @@ assists users migrating to a new version.
 
 ## Airflow Master
 
+### Change to Permissions
+
+The DAG-level permission actions, `can_dag_read` and `can_dag_edit` are going 
away. They are being replaced with `can_read` and `can_edit`. When a role is 
given DAG-level access, the resource name (or "view menu", in Flask App-Builder 
parlance) will now be prefixed with `DAG:`. So the action `can_dag_read` on 
`example_dag_id`, is now represented as `can_read` on `DAG:example_dag_id`.
+
+*As part of running `db upgrade`, existing permissions will be migrated for 
you.*
+
+When DAGs are initialized with the `access_control` variable set, any usage of 
the old permission names will automatically be updated in the database, so this 
won't be a breaking change. A DeprecationWarning will be raised.
+
+### Changes to Airflow Plugins
+
 If you are using Airflow Plugins and were passing `admin_views` & `menu_links` 
which were used in the
 non-RBAC UI (`flask-admin` based UI), upto it to use `flask_appbuilder_views` 
and `flask_appbuilder_menu_links`.
 
diff --git a/airflow/api_connexion/endpoints/dag_endpoint.py 
b/airflow/api_connexion/endpoints/dag_endpoint.py
index 7c32a43..33c3643 100644
--- a/airflow/api_connexion/endpoints/dag_endpoint.py
+++ b/airflow/api_connexion/endpoints/dag_endpoint.py
@@ -28,10 +28,11 @@ from airflow.api_connexion.schemas.dag_schema import (
     dags_collection_schema,
 )
 from airflow.models.dag import DagModel
+from airflow.security import permissions
 from airflow.utils.session import provide_session
 
 
-@security.requires_access([("can_read", "Dag")])
+@security.requires_access([("can_read", permissions.RESOURCE_DAGS)])
 @provide_session
 def get_dag(dag_id, session):
     """
@@ -45,7 +46,7 @@ def get_dag(dag_id, session):
     return dag_schema.dump(dag)
 
 
-@security.requires_access([("can_read", "Dag")])
+@security.requires_access([("can_read", permissions.RESOURCE_DAGS)])
 def get_dag_details(dag_id):
     """
     Get details of DAG.
@@ -56,7 +57,7 @@ def get_dag_details(dag_id):
     return dag_detail_schema.dump(dag)
 
 
-@security.requires_access([("can_read", "Dag")])
+@security.requires_access([("can_read", permissions.RESOURCE_DAGS)])
 @format_parameters({'limit': check_limit})
 def get_dags(limit, offset=0):
     """
@@ -69,7 +70,7 @@ def get_dags(limit, offset=0):
     return dags_collection_schema.dump(DAGCollection(dags=dags, 
total_entries=total_entries))
 
 
-@security.requires_access([("can_edit", "Dag")])
+@security.requires_access([("can_edit", permissions.RESOURCE_DAGS)])
 @provide_session
 def patch_dag(session, dag_id, update_mask=None):
     """
diff --git a/airflow/api_connexion/endpoints/dag_run_endpoint.py 
b/airflow/api_connexion/endpoints/dag_run_endpoint.py
index 8f2a3e0..2d3a297 100644
--- a/airflow/api_connexion/endpoints/dag_run_endpoint.py
+++ b/airflow/api_connexion/endpoints/dag_run_endpoint.py
@@ -28,11 +28,12 @@ from airflow.api_connexion.schemas.dag_run_schema import (
     dagruns_batch_form_schema,
 )
 from airflow.models import DagModel, DagRun
+from airflow.security import permissions
 from airflow.utils.session import provide_session
 from airflow.utils.types import DagRunType
 
 
-@security.requires_access([("can_read", "Dag"), ("can_delete", "DagRun")])
+@security.requires_access([("can_read", permissions.RESOURCE_DAGS), 
("can_delete", "DagRun")])
 @provide_session
 def delete_dag_run(dag_id, dag_run_id, session):
     """
@@ -43,7 +44,7 @@ def delete_dag_run(dag_id, dag_run_id, session):
     return NoContent, 204
 
 
-@security.requires_access([("can_read", "Dag"), ("can_read", "DagRun")])
+@security.requires_access([("can_read", permissions.RESOURCE_DAGS), 
("can_read", "DagRun")])
 @provide_session
 def get_dag_run(dag_id, dag_run_id, session):
     """
@@ -58,7 +59,7 @@ def get_dag_run(dag_id, dag_run_id, session):
     return dagrun_schema.dump(dag_run)
 
 
-@security.requires_access([("can_read", "Dag"), ("can_read", "DagRun")])
+@security.requires_access([("can_read", permissions.RESOURCE_DAGS), 
("can_read", "DagRun")])
 @format_parameters(
     {
         'start_date_gte': format_datetime,
@@ -157,7 +158,7 @@ def _apply_date_filters_to_query(
     return query
 
 
-@security.requires_access([("can_read", "Dag"), ("can_read", "DagRun")])
+@security.requires_access([("can_read", permissions.RESOURCE_DAGS), 
("can_read", "DagRun")])
 @provide_session
 def get_dag_runs_batch(session):
     """
@@ -193,7 +194,7 @@ def get_dag_runs_batch(session):
     return dagrun_collection_schema.dump(DAGRunCollection(dag_runs=dag_runs, 
total_entries=total_entries))
 
 
-@security.requires_access([("can_read", "Dag"), ("can_create", "DagRun")])
+@security.requires_access([("can_read", permissions.RESOURCE_DAGS), 
("can_create", "DagRun")])
 @provide_session
 def post_dag_run(dag_id, session):
     """
diff --git a/airflow/api_connexion/endpoints/extra_link_endpoint.py 
b/airflow/api_connexion/endpoints/extra_link_endpoint.py
index e831d1b..4ec0554 100644
--- a/airflow/api_connexion/endpoints/extra_link_endpoint.py
+++ b/airflow/api_connexion/endpoints/extra_link_endpoint.py
@@ -23,12 +23,13 @@ from airflow.api_connexion.exceptions import NotFound
 from airflow.exceptions import TaskNotFound
 from airflow.models.dagbag import DagBag
 from airflow.models.dagrun import DagRun as DR
+from airflow.security import permissions
 from airflow.utils.session import provide_session
 
 
 @security.requires_access(
     [
-        ('can_read', 'Dag'),
+        ('can_read', permissions.RESOURCE_DAGS),
         ('can_read', 'DagRun'),
         ('can_read', 'Task'),
         ('can_read', 'TaskInstance'),
diff --git a/airflow/api_connexion/endpoints/log_endpoint.py 
b/airflow/api_connexion/endpoints/log_endpoint.py
index 4b48ed0..32d5396 100644
--- a/airflow/api_connexion/endpoints/log_endpoint.py
+++ b/airflow/api_connexion/endpoints/log_endpoint.py
@@ -23,11 +23,14 @@ from airflow.api_connexion import security
 from airflow.api_connexion.exceptions import BadRequest, NotFound
 from airflow.api_connexion.schemas.log_schema import LogResponseObject, 
logs_schema
 from airflow.models import DagRun
+from airflow.security import permissions
 from airflow.utils.log.log_reader import TaskLogReader
 from airflow.utils.session import provide_session
 
 
-@security.requires_access([('can_read', 'Dag'), ('can_read', 'DagRun'), 
('can_read', 'Task')])
+@security.requires_access(
+    [('can_read', permissions.RESOURCE_DAGS), ('can_read', 'DagRun'), 
('can_read', 'Task')]
+)
 @provide_session
 def get_log(session, dag_id, dag_run_id, task_id, task_try_number, 
full_content=False, token=None):
     """
diff --git a/airflow/api_connexion/endpoints/task_endpoint.py 
b/airflow/api_connexion/endpoints/task_endpoint.py
index 4626962..d818ebb 100644
--- a/airflow/api_connexion/endpoints/task_endpoint.py
+++ b/airflow/api_connexion/endpoints/task_endpoint.py
@@ -21,9 +21,10 @@ from airflow.api_connexion import security
 from airflow.api_connexion.exceptions import NotFound
 from airflow.api_connexion.schemas.task_schema import TaskCollection, 
task_collection_schema, task_schema
 from airflow.exceptions import TaskNotFound
+from airflow.security import permissions
 
 
-@security.requires_access([("can_read", "Dag"), ("can_read", "Task")])
+@security.requires_access([("can_read", permissions.RESOURCE_DAGS), 
("can_read", "Task")])
 def get_task(dag_id, task_id):
     """
     Get simplified representation of a task.
@@ -39,7 +40,7 @@ def get_task(dag_id, task_id):
     return task_schema.dump(task)
 
 
-@security.requires_access([("can_read", "Dag"), ("can_read", "Task")])
+@security.requires_access([("can_read", permissions.RESOURCE_DAGS), 
("can_read", "Task")])
 def get_tasks(dag_id):
     """
     Get tasks for DAG
diff --git a/airflow/api_connexion/endpoints/task_instance_endpoint.py 
b/airflow/api_connexion/endpoints/task_instance_endpoint.py
index cd66bc3..b9ee25f 100644
--- a/airflow/api_connexion/endpoints/task_instance_endpoint.py
+++ b/airflow/api_connexion/endpoints/task_instance_endpoint.py
@@ -37,13 +37,14 @@ from airflow.api_connexion.schemas.task_instance_schema 
import (
 from airflow.models.dagrun import DagRun as DR
 from airflow.models.taskinstance import clear_task_instances, TaskInstance as 
TI
 from airflow.models import SlaMiss
+from airflow.security import permissions
 from airflow.utils.state import State
 from airflow.utils.session import provide_session
 
 
 @security.requires_access(
     [
-        ("can_read", "Dag"),
+        ("can_read", permissions.RESOURCE_DAGS),
         ("can_read", "DagRun"),
         ("can_read", "Task"),
     ]
@@ -101,7 +102,7 @@ def _apply_range_filter(query, key, value_range: Tuple[Any, 
Any]):
 )
 @security.requires_access(
     [
-        ("can_read", "Dag"),
+        ("can_read", permissions.RESOURCE_DAGS),
         ("can_read", "DagRun"),
         ("can_read", "Task"),
     ]
@@ -169,7 +170,9 @@ def get_task_instances(
     )
 
 
-@security.requires_access([("can_read", "Dag"), ("can_read", "DagRun"), 
("can_read", "Task")])
+@security.requires_access(
+    [("can_read", permissions.RESOURCE_DAGS), ("can_read", "DagRun"), 
("can_read", "Task")]
+)
 @provide_session
 def get_task_instances_batch(session=None):
     """
@@ -223,7 +226,9 @@ def get_task_instances_batch(session=None):
     )
 
 
-@security.requires_access([("can_read", "Dag"), ("can_read", "DagRun"), 
("can_edit", "Task")])
+@security.requires_access(
+    [("can_read", permissions.RESOURCE_DAGS), ("can_read", "DagRun"), 
("can_edit", "Task")]
+)
 @provide_session
 def post_clear_task_instances(dag_id: str, session=None):
     """
@@ -263,7 +268,9 @@ def post_clear_task_instances(dag_id: str, session=None):
     )
 
 
-@security.requires_access([("can_read", "Dag"), ("can_read", "DagRun"), 
("can_edit", "Task")])
+@security.requires_access(
+    [("can_read", permissions.RESOURCE_DAGS), ("can_read", "DagRun"), 
("can_edit", "Task")]
+)
 @provide_session
 def post_set_task_instances_state(dag_id, session):
     """Set a state of task instances."""
diff --git a/airflow/api_connexion/endpoints/xcom_endpoint.py 
b/airflow/api_connexion/endpoints/xcom_endpoint.py
index 9d8e888..39dc503 100644
--- a/airflow/api_connexion/endpoints/xcom_endpoint.py
+++ b/airflow/api_connexion/endpoints/xcom_endpoint.py
@@ -31,11 +31,17 @@ from airflow.api_connexion.schemas.xcom_schema import (
     xcom_collection_schema,
 )
 from airflow.models import DagRun as DR, XCom
+from airflow.security import permissions
 from airflow.utils.session import provide_session
 
 
 @security.requires_access(
-    [("can_read", "Dag"), ("can_read", "DagRun"), ("can_read", "Task"), 
("can_read", "XCom")]
+    [
+        ("can_read", permissions.RESOURCE_DAGS),
+        ("can_read", "DagRun"),
+        ("can_read", "Task"),
+        ("can_read", "XCom"),
+    ]
 )
 @format_parameters({'limit': check_limit})
 @provide_session
@@ -71,7 +77,12 @@ def get_xcom_entries(
 
 
 @security.requires_access(
-    [("can_read", "Dag"), ("can_read", "DagRun"), ("can_read", "Task"), 
("can_read", "XCom")]
+    [
+        ("can_read", permissions.RESOURCE_DAGS),
+        ("can_read", "DagRun"),
+        ("can_read", "Task"),
+        ("can_read", "XCom"),
+    ]
 )
 @provide_session
 def get_xcom_entry(
diff --git a/airflow/api_connexion/security.py 
b/airflow/api_connexion/security.py
index 80afe12..373a700 100644
--- a/airflow/api_connexion/security.py
+++ b/airflow/api_connexion/security.py
@@ -20,6 +20,7 @@ from typing import Callable, Optional, Sequence, Tuple, 
TypeVar, cast
 
 from flask import Response, current_app, g
 
+from airflow.security.permissions import RESOURCE_DAGS
 from airflow.api_connexion.exceptions import PermissionDenied, Unauthenticated
 
 T = TypeVar("T", bound=Callable)  # pylint: disable=invalid-name
@@ -38,7 +39,7 @@ def can_access_any_dags(action: str, dag_id: Optional[int] = 
None) -> bool:
     """Checks if user has read or write access to some dags."""
     appbuilder = current_app.appbuilder
     if dag_id and dag_id != '~':
-        return appbuilder.sm.has_access(action, dag_id)
+        return appbuilder.sm.has_access(action, 
appbuilder.sm.prefixed_dag_id(dag_id))
 
     user = g.user
     if action == 'can_read':
@@ -54,7 +55,7 @@ def check_authorization(
         return
     appbuilder = current_app.appbuilder
     for permission in permissions:
-        if permission in (('can_read', 'Dag'), ('can_edit', 'Dag')):
+        if permission in (('can_read', RESOURCE_DAGS), ('can_edit', 
RESOURCE_DAGS)):
             can_access_all_dags = appbuilder.sm.has_access(*permission)
             if can_access_all_dags:
                 continue
@@ -79,7 +80,6 @@ def requires_access(permissions: Optional[Sequence[Tuple[str, 
str]]] = None) ->
     def requires_access_decorator(func: T):
         @wraps(func)
         def decorated(*args, **kwargs):
-
             check_authentication()
             check_authorization(permissions, kwargs.get('dag_id'))
 
diff --git a/airflow/migrations/versions/849da589634d_prefix_dag_permissions.py 
b/airflow/migrations/versions/849da589634d_prefix_dag_permissions.py
new file mode 100644
index 0000000..23fd96a
--- /dev/null
+++ b/airflow/migrations/versions/849da589634d_prefix_dag_permissions.py
@@ -0,0 +1,115 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Prefix DAG permissions.
+
+Revision ID: 849da589634d
+Revises: 52d53670a240
+Create Date: 2020-10-01 17:25:10.006322
+
+"""
+
+from airflow.security import permissions
+from airflow.www.app import cached_app
+
+# revision identifiers, used by Alembic.
+revision = '849da589634d'
+down_revision = '52d53670a240'
+branch_labels = None
+depends_on = None
+
+
+def upgrade():   # noqa: D103
+    permissions = ['can_dag_read', 'can_dag_edit']
+    view_menus = cached_app().appbuilder.sm.get_all_view_menu()
+    convert_permissions(permissions, view_menus, upgrade_action, 
upgrade_dag_id)
+
+
+def downgrade():   # noqa: D103
+    permissions = ['can_read', 'can_edit']
+    vms = cached_app().appbuilder.sm.get_all_view_menu()
+    view_menus = [
+        vm for vm in vms if (vm.name == permissions.RESOURCE_DAGS or 
vm.name.startswith('DAG:'))
+    ]
+    convert_permissions(permissions, view_menus, downgrade_action, 
downgrade_dag_id)
+
+
+def upgrade_dag_id(dag_id):
+    """Adds the 'DAG:' prefix to a DAG view if appropriate."""
+    if dag_id == 'all_dags':
+        return permissions.RESOURCE_DAGS
+    if dag_id.startswith("DAG:"):
+        return dag_id
+    return f"DAG:{dag_id}"
+
+
+def downgrade_dag_id(dag_id):
+    """Removes the 'DAG:' prefix from a DAG view name to return the DAG id."""
+    if dag_id == permissions.RESOURCE_DAGS:
+        return 'all_dags'
+    if dag_id.startswith("DAG:"):
+        return dag_id[len("DAG:"):]
+    return dag_id
+
+
+def upgrade_action(action):
+    """Converts the a DAG permission name from the old style to the new 
style."""
+    if action == 'can_dag_read':
+        return 'can_read'
+    return 'can_edit'
+
+
+def downgrade_action(action):
+    """Converts the a DAG permission name from the old style to the new 
style."""
+    if action == 'can_read':
+        return 'can_dag_read'
+    return 'can_dag_edit'
+
+
+def convert_permissions(permissions, view_menus, convert_action, 
convert_dag_id):
+    """Creates new empty role in DB"""
+    appbuilder = cached_app().appbuilder  # pylint: disable=no-member
+    roles = appbuilder.sm.get_all_roles()
+    views_to_remove = set()
+    for permission_name in permissions:  # pylint: 
disable=too-many-nested-blocks
+        for view_menu in view_menus:
+            view_name = view_menu.name
+            old_pvm = appbuilder.sm.find_permission_view_menu(permission_name, 
view_name)
+            if not old_pvm:
+                continue
+
+            views_to_remove.add(view_name)
+            new_permission_name = convert_action(permission_name)
+            new_pvm = 
appbuilder.sm.add_permission_view_menu(new_permission_name, 
convert_dag_id(view_name))
+            for role in roles:
+                if appbuilder.sm.exist_permission_on_roles(view_name, 
permission_name, [role.id]):
+                    appbuilder.sm.add_permission_role(role, new_pvm)
+                    appbuilder.sm.del_permission_role(role, old_pvm)
+                    print(f"DELETING: {role.name}  ---->   
{view_name}.{permission_name}")
+            appbuilder.sm.del_permission_view_menu(permission_name, view_name)
+            print(f"DELETING: perm_view  ---->   
{view_name}.{permission_name}")
+    for view_name in views_to_remove:
+        if appbuilder.sm.find_view_menu(view_name):
+            appbuilder.sm.del_view_menu(view_name)
+            print(f"DELETING: view_menu  ---->   {view_name}")
+
+    if 'can_dag_read' in permissions:
+        for permission_name in permissions:
+            if appbuilder.sm.find_permission(permission_name):
+                appbuilder.sm.del_permission(permission_name)
+                print(f"DELETING: permission  ---->   {permission_name}")
diff --git a/airflow/models/dag.py b/airflow/models/dag.py
index 3553920..45269f3 100644
--- a/airflow/models/dag.py
+++ b/airflow/models/dag.py
@@ -51,6 +51,7 @@ from airflow.models.dagcode import DagCode
 from airflow.models.dagpickle import DagPickle
 from airflow.models.dagrun import DagRun
 from airflow.models.taskinstance import Context, TaskInstance, 
clear_task_instances
+from airflow.security import permissions
 from airflow.stats import Stats
 from airflow.utils import timezone
 from airflow.utils.dates import cron_presets, date_range as utils_date_range
@@ -176,7 +177,7 @@ class DAG(BaseDag, LoggingMixin):
         that it is executed when the dag succeeds.
     :type on_success_callback: callable
     :param access_control: Specify optional DAG-level permissions, e.g.,
-        "{'role1': {'can_dag_read'}, 'role2': {'can_dag_read', 
'can_dag_edit'}}"
+        "{'role1': {'can_read'}, 'role2': {'can_read', 'can_edit'}}"
     :type access_control: dict
     :param is_paused_upon_creation: Specifies if the dag is paused when 
created for the first time.
         If the dag exists already, this flag will be ignored. If this optional 
parameter
@@ -332,7 +333,7 @@ class DAG(BaseDag, LoggingMixin):
         self.on_failure_callback = on_failure_callback
         self.doc_md = doc_md
 
-        self._access_control = access_control
+        self._access_control = 
DAG._upgrade_outdated_dag_access_control(access_control)
         self.is_paused_upon_creation = is_paused_upon_creation
 
         self.jinja_environment_kwargs = jinja_environment_kwargs
@@ -382,6 +383,32 @@ class DAG(BaseDag, LoggingMixin):
 
     # /Context Manager ----------------------------------------------
 
+    @staticmethod
+    def _upgrade_outdated_dag_access_control(access_control=None):
+        """
+        Looks for outdated dag level permissions (can_dag_read and 
can_dag_edit) in DAG
+        access_controls (for example, {'role1': {'can_dag_read'}, 'role2': 
{'can_dag_read', 'can_dag_edit'}})
+        and replaces them with updated permissions (can_read and can_edit).
+        """
+        if not access_control:
+            return None
+        new_perm_mapping = {
+            permissions.DEPRECATED_ACTION_CAN_DAG_READ: 
permissions.ACTION_CAN_READ,
+            permissions.DEPRECATED_ACTION_CAN_DAG_EDIT: 
permissions.ACTION_CAN_EDIT,
+        }
+        updated_access_control = {}
+        for role, perms in access_control.items():
+            updated_access_control[role] = {new_perm_mapping.get(perm, perm) 
for perm in perms}
+
+        if access_control != updated_access_control:
+            warnings.warn(
+                "The 'can_dag_read' and 'can_dag_edit' permissions are 
deprecated. "
+                "Please use 'can_read' and 'can_edit', respectively.",
+                DeprecationWarning, stacklevel=3
+            )
+
+        return updated_access_control
+
     def date_range(
         self,
         start_date: datetime,
@@ -651,7 +678,7 @@ class DAG(BaseDag, LoggingMixin):
 
     @access_control.setter
     def access_control(self, value):
-        self._access_control = value
+        self._access_control = DAG._upgrade_outdated_dag_access_control(value)
 
     @property
     def description(self) -> Optional[str]:
diff --git a/airflow/security/permissions.py b/airflow/security/permissions.py
new file mode 100644
index 0000000..94d2cc2
--- /dev/null
+++ b/airflow/security/permissions.py
@@ -0,0 +1,28 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Resource Constants
+RESOURCE_DAGS = 'Dags'
+RESOURCE_DAG_PREFIX = 'DAG:'
+
+# Action Constants
+ACTION_CAN_CREATE = 'can_create'
+ACTION_CAN_READ = 'can_read'
+ACTION_CAN_EDIT = 'can_edit'
+ACTION_CAN_DELETE = 'can_delete'
+DEPRECATED_ACTION_CAN_DAG_READ = 'can_dag_read'
+DEPRECATED_ACTION_CAN_DAG_EDIT = 'can_dag_edit'
diff --git a/airflow/www/app.py b/airflow/www/app.py
index e6f3104..21d7d3b 100644
--- a/airflow/www/app.py
+++ b/airflow/www/app.py
@@ -58,6 +58,7 @@ def sync_appbuilder_roles(flask_app):
     if conf.getboolean('webserver', 'UPDATE_FAB_PERMS'):
         security_manager = flask_app.appbuilder.sm
         security_manager.sync_roles()
+        security_manager.sync_resource_permissions()
 
 
 def create_app(config=None, testing=False, app_name="Airflow"):
diff --git a/airflow/www/decorators.py b/airflow/www/decorators.py
index d73967a..4ee0af9 100644
--- a/airflow/www/decorators.py
+++ b/airflow/www/decorators.py
@@ -34,6 +34,7 @@ def action_logging(f: T) -> T:
     """
     Decorator to log user actions
     """
+
     @functools.wraps(f)
     def wrapper(*args, **kwargs):
 
@@ -50,11 +51,11 @@ def action_logging(f: T) -> T:
                 owner=user,
                 extra=str([(k, v) for k, v in request.values.items() if k not 
in fields_skip_logging]),
                 task_id=request.values.get('task_id'),
-                dag_id=request.values.get('dag_id'))
+                dag_id=request.values.get('dag_id'),
+            )
 
             if 'execution_date' in request.values:
-                log.execution_date = pendulum.parse(
-                    request.values.get('execution_date'), strict=False)
+                log.execution_date = 
pendulum.parse(request.values.get('execution_date'), strict=False)
 
             session.add(log)
 
@@ -67,6 +68,7 @@ def gzipped(f: T) -> T:
     """
     Decorator to make a view compressed
     """
+
     @functools.wraps(f)
     def view_func(*args, **kwargs):
         @after_this_request
@@ -78,12 +80,14 @@ def gzipped(f: T) -> T:
 
             response.direct_passthrough = False
 
-            if (response.status_code < 200 or response.status_code >= 300 or
-                    'Content-Encoding' in response.headers):
+            if (
+                response.status_code < 200
+                or response.status_code >= 300
+                or 'Content-Encoding' in response.headers
+            ):
                 return response
             gzip_buffer = IO()
-            gzip_file = gzip.GzipFile(mode='wb',
-                                      fileobj=gzip_buffer)
+            gzip_file = gzip.GzipFile(mode='wb', fileobj=gzip_buffer)
             gzip_file.write(response.data)
             gzip_file.close()
 
@@ -103,30 +107,22 @@ def has_dag_access(**dag_kwargs) -> Callable[[T], T]:
     """
     Decorator to check whether the user has read / write permission on the dag.
     """
+
     def decorator(f: T):
         @functools.wraps(f)
         def wrapper(self, *args, **kwargs):
-            has_access = self.appbuilder.sm.has_access
             dag_id = request.values.get('dag_id')
-            # if it is false, we need to check whether user has write access 
on the dag
-            can_dag_edit = dag_kwargs.get('can_dag_edit', False)
-
-            # 1. check whether the user has can_dag_edit permissions on 
all_dags
-            # 2. if 1 false, check whether the user
-            #    has can_dag_edit permissions on the dag
-            # 3. if 2 false, check whether it is can_dag_read view,
-            #    and whether user has the permissions
-            if (
-                has_access('can_dag_edit', 'all_dags') or
-                has_access('can_dag_edit', dag_id) or (not can_dag_edit and
-                                                       
(has_access('can_dag_read',
-                                                                   'all_dags') 
or
-                                                        
has_access('can_dag_read',
-                                                                   dag_id)))):
-                return f(self, *args, **kwargs)
+            needs_edit_access = dag_kwargs.get('can_dag_edit', False)
+
+            if needs_edit_access:
+                if self.appbuilder.sm.can_edit_dag(dag_id):
+                    return f(self, *args, **kwargs)
             else:
-                flash("Access is Denied", "danger")
-                return redirect(url_for(self.appbuilder.sm.auth_view.
-                                        __class__.__name__ + ".login"))
+                if self.appbuilder.sm.can_read_dag(dag_id):
+                    return f(self, *args, **kwargs)
+            flash("Access is Denied", "danger")
+            return 
redirect(url_for(self.appbuilder.sm.auth_view.__class__.__name__ + ".login"))
+
         return cast(T, wrapper)
+
     return decorator
diff --git a/airflow/www/security.py b/airflow/www/security.py
index 20686b7..9aa543e 100644
--- a/airflow/www/security.py
+++ b/airflow/www/security.py
@@ -29,6 +29,7 @@ from sqlalchemy.orm import joinedload
 from airflow import models
 from airflow.exceptions import AirflowException
 from airflow.models import DagModel
+from airflow.security import permissions
 from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.utils.session import provide_session
 from airflow.www.utils import CustomSQLAInterface
@@ -41,13 +42,6 @@ EXISTING_ROLES = {
     'Public',
 }
 
-CAN_CREATE = 'can_create'
-CAN_READ = 'can_read'
-CAN_DAG_READ = 'can_dag_read'
-CAN_EDIT = 'can_edit'
-CAN_DAG_EDIT = 'can_dag_edit'
-CAN_DELETE = 'can_delete'
-
 
 class AirflowSecurityManager(SecurityManager, LoggingMixin):
     """Custom security manager, which introduces an permission model adapted 
to Airflow"""
@@ -160,19 +154,10 @@ class AirflowSecurityManager(SecurityManager, 
LoggingMixin):
     # [END security_op_perms]
 
     # global view-menu for dag-level access
-    DAG_VMS = {'all_dags'}
+    DAG_VMS = {permissions.RESOURCE_DAGS}
 
-    WRITE_DAG_PERMS = {
-        'can_dag_edit',
-        'can_edit',
-    }
-
-    READ_DAG_PERMS = {
-        'can_dag_read',
-        'can_read',
-    }
-
-    DAG_PERMS = WRITE_DAG_PERMS | READ_DAG_PERMS
+    READ_DAG_PERMS = {permissions.ACTION_CAN_READ}
+    DAG_PERMS = {permissions.ACTION_CAN_READ, permissions.ACTION_CAN_EDIT}
 
     ###########################################################################
     #                     DEFAULT ROLE CONFIGURATIONS
@@ -280,11 +265,11 @@ class AirflowSecurityManager(SecurityManager, 
LoggingMixin):
 
     def get_readable_dags(self, user):
         """Gets the DAGs readable by authenticated user."""
-        return self.get_accessible_dags([CAN_READ, CAN_DAG_READ], user)
+        return self.get_accessible_dags([permissions.ACTION_CAN_READ], user)
 
     def get_editable_dags(self, user):
         """Gets the DAGs editable by authenticated user."""
-        return self.get_accessible_dags([CAN_EDIT, CAN_DAG_EDIT], user)
+        return self.get_accessible_dags([permissions.ACTION_CAN_EDIT], user)
 
     def get_readable_dag_ids(self, user) -> Set[str]:
         """Gets the DAG IDs readable by authenticated user."""
@@ -296,7 +281,9 @@ class AirflowSecurityManager(SecurityManager, LoggingMixin):
 
     def get_accessible_dag_ids(self, user) -> Set[str]:
         """Gets the DAG IDs editable or readable by authenticated user."""
-        accessible_dags = self.get_accessible_dags([CAN_EDIT, CAN_DAG_EDIT, 
CAN_READ, CAN_DAG_READ], user)
+        accessible_dags = self.get_accessible_dags(
+            [permissions.ACTION_CAN_EDIT, permissions.ACTION_CAN_READ], user
+        )
         return set(dag.dag_id for dag in accessible_dags)
 
     @provide_session
@@ -320,33 +307,83 @@ class AirflowSecurityManager(SecurityManager, 
LoggingMixin):
             for permission in role.permissions:
                 resource = permission.view_menu.name
                 action = permission.permission.name
-                if action in user_actions:
+                if action not in user_actions:
+                    continue
+
+                if resource.startswith(permissions.RESOURCE_DAG_PREFIX):
+                    
resources.add(resource[len(permissions.RESOURCE_DAG_PREFIX) :])
+                else:
                     resources.add(resource)
 
-        if bool({'Dag', 'all_dags'}.intersection(resources)):
+        if permissions.RESOURCE_DAGS in resources:
             return session.query(DagModel)
 
         return session.query(DagModel).filter(DagModel.dag_id.in_(resources))
 
-    def has_access(self, permission, view_name, user=None) -> bool:
+    def can_read_dag(self, dag_id, user=None) -> bool:
+        """Determines whether a user has DAG read access."""
+        if not user:
+            user = g.user
+        prefixed_dag_id = self.prefixed_dag_id(dag_id)
+        return self._has_view_access(
+            user, permissions.ACTION_CAN_READ, permissions.RESOURCE_DAGS
+        ) or self._has_view_access(user, permissions.ACTION_CAN_READ, 
prefixed_dag_id)
+
+    def can_edit_dag(self, dag_id, user=None) -> bool:
+        """Determines whether a user has DAG edit access."""
+        if not user:
+            user = g.user
+        prefixed_dag_id = self.prefixed_dag_id(dag_id)
+
+        return self._has_view_access(
+            user, permissions.ACTION_CAN_EDIT, permissions.RESOURCE_DAGS
+        ) or self._has_view_access(user, permissions.ACTION_CAN_EDIT, 
prefixed_dag_id)
+
+    def prefixed_dag_id(self, dag_id):
+        """Returns the permission name for a DAG id."""
+        if dag_id == permissions.RESOURCE_DAGS:
+            return dag_id
+
+        if dag_id.startswith(permissions.RESOURCE_DAG_PREFIX):
+            return dag_id
+        return f"{permissions.RESOURCE_DAG_PREFIX}{dag_id}"
+
+    def is_dag_resource(self, resource_name):
+        """Determines if a permission belongs to a DAG or all DAGs."""
+        if resource_name == permissions.RESOURCE_DAGS:
+            return True
+        return resource_name.startswith(permissions.RESOURCE_DAG_PREFIX)
+
+    def has_access(self, permission, resource, user=None) -> bool:
         """
         Verify whether a given user could perform certain permission
-        (e.g can_read, can_write) on the given dag_id.
+        (e.g can_read, can_write) on the given resource.
 
-        :param permission: permission on dag_id(e.g can_read, can_edit).
+        :param permission: permission on resource (e.g can_read, can_edit).
         :type permission: str
-        :param view_name: name of view-menu(e.g dag id is a view-menu as well).
-        :type view_name: str
+        :param resource: name of view-menu or resource.
+        :type resource: str
         :param user: user name
         :type user: str
-        :return: a bool whether user could perform certain permission on the 
dag_id.
+        :return: a bool whether user could perform certain permission on the 
resource.
         :rtype bool
         """
         if not user:
             user = g.user
+        # breakpoint()
         if user.is_anonymous:
-            return self.is_item_public(permission, view_name)
-        return self._has_view_access(user, permission, view_name)
+            return self.is_item_public(permission, resource)
+
+        has_access = self._has_view_access(user, permission, resource)
+        # FAB built-in view access method. Won't work for AllDag access.
+
+        if self.is_dag_resource(resource):
+            if permission == permissions.ACTION_CAN_READ:
+                has_access |= self.can_read_dag(resource, user)
+            elif permission == permissions.ACTION_CAN_EDIT:
+                has_access |= self.can_edit_dag(resource, user)
+
+        return has_access
 
     def _get_and_cache_perms(self):
         """
@@ -377,13 +414,13 @@ class AirflowSecurityManager(SecurityManager, 
LoggingMixin):
         """
         Has all the dag access in any of the 3 cases:
         1. Role needs to be in (Admin, Viewer, User, Op).
-        2. Has can_dag_read permission on all_dags view.
-        3. Has can_dag_edit permission on all_dags view.
+        2. Has can_read permission on dags view.
+        3. Has can_edit permission on dags view.
         """
         return (
             self._has_role(['Admin', 'Viewer', 'Op', 'User'])
-            or self._has_perm('can_dag_read', 'all_dags')
-            or self._has_perm('can_dag_edit', 'all_dags')
+            or self._has_perm(permissions.ACTION_CAN_READ, 
permissions.RESOURCE_DAGS)
+            or self._has_perm(permissions.ACTION_CAN_EDIT, 
permissions.RESOURCE_DAGS)
         )
 
     def clean_perms(self):
@@ -467,10 +504,10 @@ class AirflowSecurityManager(SecurityManager, 
LoggingMixin):
             .all()
         )
 
-        # create can_dag_edit and can_dag_read permissions for every dag(vm)
+        # create can_edit and can_read permissions for every dag(vm)
         for dag in all_dags_models:
             for perm in self.DAG_PERMS:
-                merge_pv(perm, dag.dag_id)
+                merge_pv(perm, self.prefixed_dag_id(dag.dag_id))
 
         # for all the dag-level role, add the permission of viewer
         # with the dag view to ab_permission_view
@@ -481,7 +518,7 @@ class AirflowSecurityManager(SecurityManager, LoggingMixin):
         update_perm_views = []
 
         # need to remove all_dag vm from all the existing view-menus
-        dag_vm = self.find_view_menu('all_dags')
+        dag_vm = self.find_view_menu(permissions.RESOURCE_DAGS)
         ab_perm_view_role = sqla_models.assoc_permissionview_role
         perm_view = self.permissionview_model
         view_menu = self.viewmenu_model
@@ -501,6 +538,7 @@ class AirflowSecurityManager(SecurityManager, LoggingMixin):
         for role in dag_role:
             # pylint: disable=no-member
             # Get all the perm-view of the role
+
             existing_perm_view_by_user = 
self.get_session.query(ab_perm_view_role).filter(
                 ab_perm_view_role.columns.role_id == role.id
             )
@@ -520,18 +558,23 @@ class AirflowSecurityManager(SecurityManager, 
LoggingMixin):
     def update_admin_perm_view(self):
         """
         Admin should has all the permission-views, except the dag views.
-        because Admin have already have all_dags permission.
+        because Admin have already have Dag permission.
         Add the missing ones to the table for admin.
 
         :return: None.
         """
-        all_dag_view = self.find_view_menu('all_dags')
-        dag_perm_ids = [self.find_permission('can_dag_edit').id, 
self.find_permission('can_dag_read').id]
+        all_dag_view = self.find_view_menu(permissions.RESOURCE_DAGS)
+        dag_pvs = (
+            self.get_session.query(sqla_models.ViewMenu)
+            
.filter(sqla_models.ViewMenu.name.like(f"{permissions.RESOURCE_DAG_PREFIX}%"))
+            .all()
+        )
+        pv_ids = [pv.id for pv in dag_pvs]
         pvms = (
             self.get_session.query(sqla_models.PermissionView)
             .filter(
                 ~and_(
-                    sqla_models.PermissionView.permission_id.in_(dag_perm_ids),
+                    sqla_models.PermissionView.view_menu_id.in_(pv_ids),
                     sqla_models.PermissionView.view_menu_id != all_dag_view.id,
                 )
             )
@@ -553,7 +596,6 @@ class AirflowSecurityManager(SecurityManager, LoggingMixin):
 
         :return: None.
         """
-        self.log.debug('Start syncing user roles.')
         # Create global all-dag VM
         self.create_perm_vm_for_all_dag()
 
@@ -564,19 +606,18 @@ class AirflowSecurityManager(SecurityManager, 
LoggingMixin):
             perms = config['perms']
             self.init_role(role, vms, perms)
         self.create_custom_dag_permission_view()
-
         # init existing roles, the rest role could be created through UI.
         self.update_admin_perm_view()
         self.clean_perms()
 
-    def sync_resource_permissions(self, permissions=None):
+    def sync_resource_permissions(self, perms=None):
         """
         Populates resource-based permissions.
         """
-        if not permissions:
+        if not perms:
             return
 
-        for action, resource in permissions:
+        for action, resource in perms:
             self.add_view_menu(resource)
             self.add_permission_view_menu(action, resource)
 
@@ -589,17 +630,18 @@ class AirflowSecurityManager(SecurityManager, 
LoggingMixin):
         :type dag_id: str
         :param access_control: a dict where each key is a rolename and
             each value is a set() of permission names (e.g.,
-            {'can_dag_read'}
+            {'can_read'}
         :type access_control: dict
         :return:
         """
+        prefixed_dag_id = self.prefixed_dag_id(dag_id)
         for dag_perm in self.DAG_PERMS:
-            perm_on_dag = self.find_permission_view_menu(dag_perm, dag_id)
+            perm_on_dag = self.find_permission_view_menu(dag_perm, 
prefixed_dag_id)
             if perm_on_dag is None:
-                self.add_permission_view_menu(dag_perm, dag_id)
+                self.add_permission_view_menu(dag_perm, prefixed_dag_id)
 
         if access_control:
-            self._sync_dag_view_permissions(dag_id, access_control)
+            self._sync_dag_view_permissions(prefixed_dag_id, access_control)
 
     def _sync_dag_view_permissions(self, dag_id, access_control):
         """Set the access policy on the given DAG's ViewModel.
@@ -608,15 +650,16 @@ class AirflowSecurityManager(SecurityManager, 
LoggingMixin):
         :type dag_id: str
         :param access_control: a dict where each key is a rolename and
             each value is a set() of permission names (e.g.,
-            {'can_dag_read'}
+            {'can_read'}
         :type access_control: dict
         """
+        prefixed_dag_id = self.prefixed_dag_id(dag_id)
 
         def _get_or_create_dag_permission(perm_name):
-            dag_perm = self.find_permission_view_menu(perm_name, dag_id)
+            dag_perm = self.find_permission_view_menu(perm_name, 
prefixed_dag_id)
             if not dag_perm:
-                self.log.info("Creating new permission '%s' on view '%s'", 
perm_name, dag_id)
-                dag_perm = self.add_permission_view_menu(perm_name, dag_id)
+                self.log.info("Creating new permission '%s' on view '%s'", 
perm_name, prefixed_dag_id)
+                dag_perm = self.add_permission_view_menu(perm_name, 
prefixed_dag_id)
 
             return dag_perm
 
@@ -628,11 +671,14 @@ class AirflowSecurityManager(SecurityManager, 
LoggingMixin):
                     target_perms_for_role = access_control.get(role.name, {})
                     if perm.permission.name not in target_perms_for_role:
                         self.log.info(
-                            "Revoking '%s' on DAG '%s' for role '%s'", 
perm.permission, dag_id, role.name
+                            "Revoking '%s' on DAG '%s' for role '%s'",
+                            perm.permission,
+                            prefixed_dag_id,
+                            role.name,
                         )
                         self.del_permission_role(role, perm)
 
-        dag_view = self.find_view_menu(dag_id)
+        dag_view = self.find_view_menu(prefixed_dag_id)
         if dag_view:
             _revoke_stale_permissions(dag_view)
 
@@ -641,7 +687,7 @@ class AirflowSecurityManager(SecurityManager, LoggingMixin):
             if not role:
                 raise AirflowException(
                     "The access_control mapping for DAG '{}' includes a role "
-                    "named '{}', but that role does not exist".format(dag_id, 
rolename)
+                    "named '{}', but that role does not 
exist".format(prefixed_dag_id, rolename)
                 )
 
             perms = set(perms)
@@ -650,7 +696,7 @@ class AirflowSecurityManager(SecurityManager, LoggingMixin):
                 raise AirflowException(
                     "The access_control map for DAG '{}' includes the 
following "
                     "invalid permissions: {}; The set of valid permissions "
-                    "is: {}".format(dag_id, (perms - self.DAG_PERMS), 
self.DAG_PERMS)
+                    "is: {}".format(prefixed_dag_id, (perms - self.DAG_PERMS), 
self.DAG_PERMS)
                 )
 
             for perm_name in perms:
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 342add0..345d705 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -63,6 +63,7 @@ from airflow.models.baseoperator import BaseOperator
 from airflow.models.dagcode import DagCode
 from airflow.models.dagrun import DagRun, DagRunType
 from airflow.models.taskinstance import TaskInstance
+from airflow.security import permissions
 from airflow.ti_deps.dep_context import DepContext
 from airflow.ti_deps.dependencies_deps import RUNNING_DEPS, 
SCHEDULER_QUEUED_DEPS
 from airflow.utils import timezone
@@ -461,7 +462,7 @@ class Airflow(AirflowBaseView):  # noqa: D101  pylint: 
disable=too-many-public-m
             if arg_tags_filter:
                 dags_query = 
dags_query.filter(DagModel.tags.any(DagTag.name.in_(arg_tags_filter)))
 
-            if 'all_dags' not in filter_dag_ids:
+            if permissions.RESOURCE_DAGS not in filter_dag_ids:
                 dags_query = 
dags_query.filter(DagModel.dag_id.in_(filter_dag_ids))
                 # pylint: enable=no-member
 
@@ -545,7 +546,7 @@ class Airflow(AirflowBaseView):  # noqa: D101  pylint: 
disable=too-many-public-m
         dr = models.DagRun
 
         allowed_dag_ids = 
current_app.appbuilder.sm.get_accessible_dag_ids(g.user)
-        if 'all_dags' in allowed_dag_ids:
+        if permissions.RESOURCE_DAGS in allowed_dag_ids:
             allowed_dag_ids = [dag_id for dag_id, in 
session.query(models.DagModel.dag_id)]
 
         dag_state_stats = session.query(dr.dag_id, dr.state, 
sqla.func.count(dr.state))\
@@ -594,7 +595,7 @@ class Airflow(AirflowBaseView):  # noqa: D101  pylint: 
disable=too-many-public-m
         if not allowed_dag_ids:
             return wwwutils.json_response({})
 
-        if 'all_dags' in allowed_dag_ids:
+        if permissions.RESOURCE_DAGS in allowed_dag_ids:
             allowed_dag_ids = {dag_id for dag_id, in 
session.query(models.DagModel.dag_id)}
 
         # Filter by post parameters
@@ -705,7 +706,7 @@ class Airflow(AirflowBaseView):  # noqa: D101  pylint: 
disable=too-many-public-m
         """Last DAG runs"""
         allowed_dag_ids = 
current_app.appbuilder.sm.get_accessible_dag_ids(g.user)
 
-        if 'all_dags' in allowed_dag_ids:
+        if permissions.RESOURCE_DAGS in allowed_dag_ids:
             allowed_dag_ids = [dag_id for dag_id, in 
session.query(models.DagModel.dag_id)]
 
         # Filter by post parameters
@@ -1389,7 +1390,7 @@ class Airflow(AirflowBaseView):  # noqa: D101  pylint: 
disable=too-many-public-m
         """Mark Dag Blocked."""
         allowed_dag_ids = 
current_app.appbuilder.sm.get_accessible_dag_ids(g.user)
 
-        if 'all_dags' in allowed_dag_ids:
+        if permissions.RESOURCE_DAGS in allowed_dag_ids:
             allowed_dag_ids = [dag_id for dag_id, in 
session.query(models.DagModel.dag_id)]
 
         # Filter by post parameters
@@ -3142,7 +3143,7 @@ class DagModelView(AirflowModelView):
 
         filter_dag_ids = 
current_app.appbuilder.sm.get_accessible_dag_ids(g.user)
         # pylint: disable=no-member
-        if not bool({'all_dags', 'Dag'}.intersection(filter_dag_ids)):
+        if permissions.RESOURCE_DAGS not in filter_dag_ids:
             dag_ids_query = 
dag_ids_query.filter(DagModel.dag_id.in_(filter_dag_ids))
             owners_query = 
owners_query.filter(DagModel.dag_id.in_(filter_dag_ids))
         # pylint: enable=no-member
diff --git a/tests/api_connexion/endpoints/test_dag_endpoint.py 
b/tests/api_connexion/endpoints/test_dag_endpoint.py
index ee75dbc..692159e 100644
--- a/tests/api_connexion/endpoints/test_dag_endpoint.py
+++ b/tests/api_connexion/endpoints/test_dag_endpoint.py
@@ -25,6 +25,7 @@ from airflow.api_connexion.exceptions import 
EXCEPTIONS_LINK_MAP
 from airflow.models import DagBag, DagModel
 from airflow.models.serialized_dag import SerializedDagModel
 from airflow.operators.dummy_operator import DummyOperator
+from airflow.security import permissions
 from airflow.utils.session import provide_session
 from airflow.www import app
 from tests.test_utils.api_connexion_utils import assert_401, create_user, 
delete_user
@@ -52,10 +53,10 @@ class TestDagEndpoint(unittest.TestCase):
             username="test",
             role_name="Test",
             permissions=[
-                ("can_create", "Dag"),
-                ("can_read", "Dag"),
-                ("can_edit", "Dag"),
-                ("can_delete", "Dag"),
+                ("can_create", permissions.RESOURCE_DAGS),
+                ("can_read", permissions.RESOURCE_DAGS),
+                ("can_edit", permissions.RESOURCE_DAGS),
+                ("can_delete", permissions.RESOURCE_DAGS),
             ],
         )
         create_user(cls.app, username="test_no_permissions", 
role_name="TestNoPermissions")  # type: ignore
diff --git a/tests/api_connexion/endpoints/test_dag_run_endpoint.py 
b/tests/api_connexion/endpoints/test_dag_run_endpoint.py
index 2b4c263..8008bee 100644
--- a/tests/api_connexion/endpoints/test_dag_run_endpoint.py
+++ b/tests/api_connexion/endpoints/test_dag_run_endpoint.py
@@ -21,6 +21,7 @@ from parameterized import parameterized
 
 from airflow.api_connexion.exceptions import EXCEPTIONS_LINK_MAP
 from airflow.models import DagModel, DagRun
+from airflow.security import permissions
 from airflow.utils import timezone
 from airflow.utils.session import create_session, provide_session
 from airflow.utils.types import DagRunType
@@ -42,7 +43,7 @@ class TestDagRunEndpoint(unittest.TestCase):
             username="test",
             role_name="Test",
             permissions=[
-                ("can_read", "Dag"),
+                ("can_read", permissions.RESOURCE_DAGS),
                 ("can_create", "DagRun"),
                 ("can_read", "DagRun"),
                 ("can_edit", "DagRun"),
diff --git a/tests/api_connexion/endpoints/test_extra_link_endpoint.py 
b/tests/api_connexion/endpoints/test_extra_link_endpoint.py
index afd73cd..4fef5bf 100644
--- a/tests/api_connexion/endpoints/test_extra_link_endpoint.py
+++ b/tests/api_connexion/endpoints/test_extra_link_endpoint.py
@@ -27,6 +27,7 @@ from airflow.models.dagrun import DagRun
 from airflow.models.xcom import XCom
 from airflow.plugins_manager import AirflowPlugin
 from airflow.providers.google.cloud.operators.bigquery import 
BigQueryExecuteQueryOperator
+from airflow.security import permissions
 from airflow.utils.dates import days_ago
 from airflow.utils.session import provide_session
 from airflow.utils.timezone import datetime
@@ -51,7 +52,7 @@ class TestGetExtraLinks(unittest.TestCase):
             username="test",
             role_name="Test",
             permissions=[
-                ('can_read', 'Dag'),
+                ('can_read', permissions.RESOURCE_DAGS),
                 ('can_read', 'DagRun'),
                 ('can_read', 'Task'),
                 ('can_read', 'TaskInstance'),
diff --git a/tests/api_connexion/endpoints/test_log_endpoint.py 
b/tests/api_connexion/endpoints/test_log_endpoint.py
index fffcf8e..5ece059 100644
--- a/tests/api_connexion/endpoints/test_log_endpoint.py
+++ b/tests/api_connexion/endpoints/test_log_endpoint.py
@@ -31,6 +31,7 @@ from airflow.api_connexion.exceptions import 
EXCEPTIONS_LINK_MAP
 from airflow.config_templates.airflow_local_settings import 
DEFAULT_LOGGING_CONFIG
 from airflow.models import DagRun, TaskInstance
 from airflow.operators.dummy_operator import DummyOperator
+from airflow.security import permissions
 from airflow.utils import timezone
 from airflow.utils.session import create_session, provide_session
 from airflow.utils.types import DagRunType
@@ -56,7 +57,11 @@ class TestGetLog(unittest.TestCase):
             cls.app,
             username="test",
             role_name="Test",
-            permissions=[('can_read', 'Dag'), ('can_read', 'DagRun'), 
('can_read', 'Task')],
+            permissions=[
+                ('can_read', permissions.RESOURCE_DAGS),
+                ('can_read', 'DagRun'),
+                ('can_read', 'Task'),
+            ],
         )
         create_user(cls.app, username="test_no_permissions", 
role_name="TestNoPermissions")
 
diff --git a/tests/api_connexion/endpoints/test_task_endpoint.py 
b/tests/api_connexion/endpoints/test_task_endpoint.py
index f599055..939d587 100644
--- a/tests/api_connexion/endpoints/test_task_endpoint.py
+++ b/tests/api_connexion/endpoints/test_task_endpoint.py
@@ -22,6 +22,7 @@ from airflow import DAG
 from airflow.models import DagBag
 from airflow.models.serialized_dag import SerializedDagModel
 from airflow.operators.dummy_operator import DummyOperator
+from airflow.security import permissions
 from airflow.www import app
 from tests.test_utils.api_connexion_utils import assert_401, create_user, 
delete_user
 from tests.test_utils.config import conf_vars
@@ -47,7 +48,11 @@ class TestTaskEndpoint(unittest.TestCase):
             cls.app,  # type: ignore
             username="test",
             role_name="Test",
-            permissions=[('can_read', 'Dag'), ('can_read', 'DagRun'), 
('can_read', 'Task')],
+            permissions=[
+                ('can_read', permissions.RESOURCE_DAGS),
+                ('can_read', 'DagRun'),
+                ('can_read', 'Task'),
+            ],
         )
         create_user(cls.app, username="test_no_permissions", 
role_name="TestNoPermissions")  # type: ignore
 
diff --git a/tests/api_connexion/endpoints/test_task_instance_endpoint.py 
b/tests/api_connexion/endpoints/test_task_instance_endpoint.py
index 06b33dc..9413920 100644
--- a/tests/api_connexion/endpoints/test_task_instance_endpoint.py
+++ b/tests/api_connexion/endpoints/test_task_instance_endpoint.py
@@ -21,6 +21,7 @@ from unittest import mock
 from parameterized import parameterized
 
 from airflow.models import DagBag, DagRun, TaskInstance, SlaMiss
+from airflow.security import permissions
 from airflow.utils.types import DagRunType
 from airflow.utils.session import provide_session
 from airflow.utils.state import State
@@ -46,7 +47,7 @@ class TestTaskInstanceEndpoint(unittest.TestCase):
             username="test",
             role_name="Test",
             permissions=[
-                ('can_read', 'Dag'),
+                ('can_read', permissions.RESOURCE_DAGS),
                 ('can_read', 'DagRun'),
                 ('can_read', 'Task'),
                 ('can_edit', 'Task'),
diff --git a/tests/api_connexion/endpoints/test_xcom_endpoint.py 
b/tests/api_connexion/endpoints/test_xcom_endpoint.py
index 368a0e0..b8fc5f7 100644
--- a/tests/api_connexion/endpoints/test_xcom_endpoint.py
+++ b/tests/api_connexion/endpoints/test_xcom_endpoint.py
@@ -19,6 +19,7 @@ import unittest
 from parameterized import parameterized
 
 from airflow.models import DagModel, DagRun as DR, XCom
+from airflow.security import permissions
 from airflow.utils.dates import parse_execution_date
 from airflow.utils.session import provide_session
 from airflow.utils.types import DagRunType
@@ -40,7 +41,7 @@ class TestXComEndpoint(unittest.TestCase):
             username="test",
             role_name="Test",
             permissions=[
-                ("can_read", "Dag"),
+                ("can_read", permissions.RESOURCE_DAGS),
                 ("can_read", "DagRun"),
                 ("can_read", "Task"),
                 ("can_read", "XCom"),
diff --git a/tests/cli/commands/test_sync_perm_command.py 
b/tests/cli/commands/test_sync_perm_command.py
index ce87023..8d2f117 100644
--- a/tests/cli/commands/test_sync_perm_command.py
+++ b/tests/cli/commands/test_sync_perm_command.py
@@ -38,7 +38,7 @@ class TestCliSyncPerm(unittest.TestCase):
         self.expect_dagbag_contains([
             DAG('has_access_control',
                 access_control={
-                    'Public': {'can_dag_read'}
+                    'Public': {'can_read'}
                 }),
             DAG('no_access_control')
         ], dagbag_mock)
@@ -56,7 +56,7 @@ class TestCliSyncPerm(unittest.TestCase):
         self.assertEqual(2, len(appbuilder.sm.sync_perm_for_dag.mock_calls))
         appbuilder.sm.sync_perm_for_dag.assert_any_call(
             'has_access_control',
-            {'Public': {'can_dag_read'}}
+            {'Public': {'can_read'}}
         )
         appbuilder.sm.sync_perm_for_dag.assert_any_call(
             'no_access_control',
diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py
index 8a11cf7..3280922 100644
--- a/tests/models/test_dag.py
+++ b/tests/models/test_dag.py
@@ -31,6 +31,7 @@ from unittest import mock
 from unittest.mock import patch
 
 import pendulum
+import pytest
 from dateutil.relativedelta import relativedelta
 from freezegun import freeze_time
 from parameterized import parameterized
@@ -1653,6 +1654,18 @@ class TestDag(unittest.TestCase):
         next_subdag_date = subdag.next_dagrun_after_date(None)
         assert next_subdag_date is None, "SubDags should never have DagRuns 
created by the scheduler"
 
+    def test_replace_outdated_access_control_actions(self):
+        outdated_permissions = {'role1': {'can_read', 'can_edit'}, 'role2': 
{'can_dag_read', 'can_dag_edit'}}
+        updated_permissions = {'role1': {'can_read', 'can_edit'}, 'role2': 
{'can_read', 'can_edit'}}
+
+        with pytest.warns(DeprecationWarning):
+            dag = DAG(dag_id='dag_with_outdated_perms', 
access_control=outdated_permissions)
+        self.assertEqual(dag.access_control, updated_permissions)
+
+        with pytest.warns(DeprecationWarning):
+            dag.access_control = outdated_permissions
+        self.assertEqual(dag.access_control, updated_permissions)
+
 
 class TestDagModel:
 
diff --git a/tests/serialization/test_dag_serialization.py 
b/tests/serialization/test_dag_serialization.py
index 93d1110..ed85ded 100644
--- a/tests/serialization/test_dag_serialization.py
+++ b/tests/serialization/test_dag_serialization.py
@@ -132,8 +132,8 @@ serialized_simple_dag_ground_truth = {
                 "test_role": {
                     "__type": "set",
                     "__var": [
-                        "can_dag_read",
-                        "can_dag_edit"
+                        "can_read",
+                        "can_edit"
                     ]
                 }
             }
@@ -164,7 +164,7 @@ def make_simple_dag():
         start_date=datetime(2019, 8, 1),
         is_paused_upon_creation=False,
         access_control={
-            "test_role": {"can_dag_read", "can_dag_edit"}
+            "test_role": {"can_read", "can_edit"}
         }
     ) as dag:
         CustomOperator(task_id='custom_task')
diff --git a/tests/test_utils/fab_utils.py b/tests/test_utils/fab_utils.py
new file mode 100644
index 0000000..27c7f67
--- /dev/null
+++ b/tests/test_utils/fab_utils.py
@@ -0,0 +1,80 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from airflow.api_connexion.exceptions import EXCEPTIONS_LINK_MAP
+from airflow.www import security
+
+
+def create_user(app, username, role_name, permissions=None):
+    appbuilder = app.appbuilder
+
+    # Removes user and role so each test has isolated test data.
+    delete_user(app, username)
+    delete_role(app, role_name)
+    role = create_role(app, role_name, permissions)
+
+    return appbuilder.sm.add_user(
+        username=username,
+        first_name=username,
+        last_name=username,
+        email=f"{username}@fab.org",
+        role=role,
+        password=username,
+    )
+
+
+def create_role(app, name, permissions=None):
+    appbuilder = app.appbuilder
+    role = appbuilder.sm.find_role(name)
+    if not role:
+        role = appbuilder.sm.add_role(name)
+    if not permissions:
+        permissions = []
+    for permission in permissions:
+        perm_object = appbuilder.sm.find_permission_view_menu(*permission)
+        appbuilder.sm.add_permission_role(role, perm_object)
+    return role
+
+
+def delete_role(app, name):
+    if app.appbuilder.sm.find_role(name):
+        app.appbuilder.sm.delete_role(name)
+
+
+def delete_roles(app):
+    for role in app.appbuilder.sm.get_all_roles():
+        if role.name not in security.EXISTING_ROLES:
+            app.appbuilder.sm.delete_role(role.name)
+
+
+def delete_user(app, username):
+    appbuilder = app.appbuilder
+    for user in appbuilder.sm.get_all_users():
+        if user.username == username:
+            for role in user.roles:
+                delete_role(app, role.name)
+            appbuilder.sm.del_register_user(user)
+            break
+
+
+def assert_401(response):
+    assert response.status_code == 401
+    assert response.json == {
+        'detail': None,
+        'status': 401,
+        'title': 'Unauthorized',
+        'type': EXCEPTIONS_LINK_MAP[401],
+    }
diff --git a/tests/www/test_security.py b/tests/www/test_security.py
index fc7f57a..f3529b6 100644
--- a/tests/www/test_security.py
+++ b/tests/www/test_security.py
@@ -28,13 +28,15 @@ from sqlalchemy import Column, Date, Float, Integer, String
 from airflow import settings
 from airflow.exceptions import AirflowException
 from airflow.models import DagModel
+from airflow.security import permissions
 from airflow.www import app as application
 from airflow.www.utils import CustomSQLAInterface
-from tests.test_utils.db import clear_db_runs
+from tests.test_utils import fab_utils
+from tests.test_utils.db import clear_db_dags, clear_db_runs
 from tests.test_utils.mock_security_manager import MockSecurityManager
 
-READ_WRITE = {'can_dag_read', 'can_dag_edit'}
-READ_ONLY = {'can_dag_read'}
+READ_WRITE = {'can_read', 'can_edit'}
+READ_ONLY = {'can_read'}
 
 logging.basicConfig(format='%(asctime)s:%(levelname)s:%(name)s:%(message)s')
 logging.getLogger().setLevel(logging.DEBUG)
@@ -76,18 +78,22 @@ class TestSecurity(unittest.TestCase):
         cls.appbuilder = cls.app.appbuilder  # pylint: disable=no-member
         cls.app.config['WTF_CSRF_ENABLED'] = False
         cls.security_manager = cls.appbuilder.sm
-        cls.role_admin = cls.security_manager.find_role('Admin')
-        cls.user = cls.appbuilder.sm.add_user(
-            'admin', 'admin', 'user', 'ad...@fab.org', cls.role_admin, 
'general'
-        )
+        cls.delete_roles()
 
     def setUp(self):
+        clear_db_runs()
+        clear_db_dags()
         self.db = SQLA(self.app)
         self.appbuilder.add_view(SomeBaseView, "SomeBaseView", 
category="BaseViews")
         self.appbuilder.add_view(SomeModelView, "SomeModelView", 
category="ModelViews")
 
         log.debug("Complete setup!")
 
+    @classmethod
+    def delete_roles(cls):
+        for role_name in ['team-a', 'MyRole1', 'MyRole5', 'Test_Role', 
'MyRole3', 'MyRole2']:
+            fab_utils.delete_role(cls.app, role_name)
+
     def expect_user_is_in_role(self, user, rolename):
         self.security_manager.init_role(rolename, [], [])
         role = self.security_manager.find_role(rolename)
@@ -97,26 +103,28 @@ class TestSecurity(unittest.TestCase):
         user.roles = [role]
         self.security_manager.update_user(user)
 
-    def assert_user_has_dag_perms(self, perms, dag_id):
+    def assert_user_has_dag_perms(self, perms, dag_id, user=None):
         for perm in perms:
             self.assertTrue(
-                self._has_dag_perm(perm, dag_id),
-                "User should have '{}' on DAG '{}'".format(perm, dag_id))
+                self._has_dag_perm(perm, dag_id, user),
+                "User should have '{}' on DAG '{}'".format(perm, dag_id),
+            )
 
-    def assert_user_does_not_have_dag_perms(self, dag_id, perms):
+    def assert_user_does_not_have_dag_perms(self, dag_id, perms, user=None):
         for perm in perms:
             self.assertFalse(
-                self._has_dag_perm(perm, dag_id),
-                "User should not have '{}' on DAG '{}'".format(perm, dag_id))
+                self._has_dag_perm(perm, dag_id, user),
+                "User should not have '{}' on DAG '{}'".format(perm, dag_id),
+            )
 
-    def _has_dag_perm(self, perm, dag_id):
-        return self.security_manager.has_access(
-            perm,
-            dag_id,
-            self.user)
+    def _has_dag_perm(self, perm, dag_id, user):
+        # if not user:
+        #     user = self.user
+        return self.security_manager.has_access(perm, 
self.security_manager.prefixed_dag_id(dag_id), user)
 
     def tearDown(self):
         clear_db_runs()
+        clear_db_dags()
         self.appbuilder = None
         self.app = None
         self.db = None
@@ -145,8 +153,7 @@ class TestSecurity(unittest.TestCase):
         self.security_manager.init_role(role_name, [], [])
         role = self.security_manager.find_role(role_name)
 
-        perm = self.security_manager.\
-            find_permission_view_menu('can_edit', 'RoleModelView')
+        perm = self.security_manager.find_permission_view_menu('can_edit', 
'RoleModelView')
         self.security_manager.add_permission_role(role, perm)
         role_perms_len = len(role.permissions)
 
@@ -165,43 +172,45 @@ class TestSecurity(unittest.TestCase):
     @mock.patch('airflow.www.security.AirflowSecurityManager.get_user_roles')
     def test_get_all_permissions_views(self, mock_get_user_roles):
         role_name = 'MyRole5'
-        role_perms = ['can_some_action']
-        role_vms = ['SomeBaseView']
-        self.security_manager.init_role(role_name, role_vms, role_perms)
-        role = self.security_manager.find_role(role_name)
+        role_perm = 'can_some_action'
+        role_vm = 'SomeBaseView'
+        username = 'get_all_permissions_views'
+
+        with self.app.app_context():
+            user = fab_utils.create_user(self.app, username, role_name, 
permissions=[(role_perm, role_vm),],)
+            role = user.roles[0]
+            mock_get_user_roles.return_value = [role]
 
-        mock_get_user_roles.return_value = [role]
-        self.assertEqual(self.security_manager
-                         .get_all_permissions_views(),
-                         {('can_some_action', 'SomeBaseView')})
+            
self.assertEqual(self.security_manager.get_all_permissions_views(), 
{(role_perm, role_vm)})
 
-        mock_get_user_roles.return_value = []
-        self.assertEqual(len(self.security_manager
-                             .get_all_permissions_views()), 0)
+            mock_get_user_roles.return_value = []
+            
self.assertEqual(len(self.security_manager.get_all_permissions_views()), 0)
 
     def test_get_accessible_dag_ids(self):
         role_name = 'MyRole1'
-        permission_action = ['can_dag_read']
+        permission_action = ['can_read']
         dag_id = 'dag_id'
-        username = "Mr. User"
-        self.security_manager.init_role(role_name, [], [])
-        self.security_manager.sync_perm_for_dag(  # type: ignore  # pylint: 
disable=no-member
-            dag_id, access_control={role_name: permission_action}
-        )
-        role = self.security_manager.find_role(role_name)
-        user = self.security_manager.add_user(
-            username=username,
-            first_name=username,
-            last_name=username,
-            email=f"{username}@fab.org",
-            role=role,
-            password=username,
+        username = "ElUser"
+
+        user = fab_utils.create_user(
+            self.app,
+            username,
+            role_name,
+            permissions=[
+                (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAGS),
+                (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAGS),
+            ],
         )
-        dag_model = DagModel(dag_id="dag_id", fileloc="/tmp/dag_.py", 
schedule_interval="2 2 * * *")
+
+        dag_model = DagModel(dag_id=dag_id, fileloc="/tmp/dag_.py", 
schedule_interval="2 2 * * *")
         self.session.add(dag_model)
         self.session.commit()
-        self.assertEqual(self.security_manager
-                         .get_accessible_dag_ids(user), {'dag_id'})
+
+        self.security_manager.sync_perm_for_dag(  # type: ignore  # pylint: 
disable=no-member
+            dag_id, access_control={role_name: permission_action}
+        )
+
+        self.assertEqual(self.security_manager.get_accessible_dag_ids(user), 
{'dag_id'})
 
     @mock.patch('airflow.www.security.AirflowSecurityManager._has_view_access')
     def test_has_access(self, mock_has_view_access):
@@ -212,10 +221,14 @@ class TestSecurity(unittest.TestCase):
 
     def test_sync_perm_for_dag_creates_permissions_on_view_menus(self):
         test_dag_id = 'TEST_DAG'
+        prefixed_test_dag_id = f'DAG:{test_dag_id}'
         self.security_manager.sync_perm_for_dag(test_dag_id, 
access_control=None)
-        for dag_perm in self.security_manager.DAG_PERMS:
-            self.assertIsNotNone(self.security_manager.
-                                 find_permission_view_menu(dag_perm, 
test_dag_id))
+        self.assertIsNotNone(
+            self.security_manager.find_permission_view_menu('can_read', 
prefixed_test_dag_id)
+        )
+        self.assertIsNotNone(
+            self.security_manager.find_permission_view_menu('can_edit', 
prefixed_test_dag_id)
+        )
 
     @mock.patch('airflow.www.security.AirflowSecurityManager._has_perm')
     @mock.patch('airflow.www.security.AirflowSecurityManager._has_role')
@@ -234,74 +247,79 @@ class TestSecurity(unittest.TestCase):
         with self.assertRaises(AirflowException) as context:
             self.security_manager.sync_perm_for_dag(
                 dag_id='access-control-test',
-                access_control={
-                    'this-role-does-not-exist': ['can_dag_edit', 
'can_dag_read']
-                })
+                access_control={'this-role-does-not-exist': ['can_edit', 
'can_read']},
+            )
         self.assertIn("role does not exist", str(context.exception))
 
+    def test_all_dag_access_doesnt_give_non_dag_access(self):
+        username = 'dag_access_user'
+        role_name = 'dag_access_role'
+        with self.app.app_context():
+            user = fab_utils.create_user(
+                self.app,
+                username,
+                role_name,
+                permissions=[
+                    (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAGS),
+                    (permissions.ACTION_CAN_READ, permissions.RESOURCE_DAGS),
+                ],
+            )
+            self.assertTrue(
+                self.security_manager.has_access(permissions.ACTION_CAN_READ, 
permissions.RESOURCE_DAGS, user)
+            )
+            
self.assertFalse(self.security_manager.has_access(permissions.ACTION_CAN_READ, 
'Task', user))
+
     def test_access_control_with_invalid_permission(self):
         invalid_permissions = [
             'can_varimport',  # a real permission, but not a member of 
DAG_PERMS
             'can_eat_pudding',  # clearly not a real permission
         ]
-        username = "Mrs. User"
-        user = self.security_manager.add_user(
-            username=username,
-            first_name=username,
-            last_name=username,
-            email=f"{username}@fab.org",
-            role=self.role_admin,
-            password=username,
-        )
+        username = "LaUser"
+        user = fab_utils.create_user(self.app, username=username, 
role_name='team-a',)
         for permission in invalid_permissions:
             self.expect_user_is_in_role(user, rolename='team-a')
             with self.assertRaises(AirflowException) as context:
                 self.security_manager.sync_perm_for_dag(
-                    'access_control_test',
-                    access_control={
-                        'team-a': {permission}
-                    })
+                    'access_control_test', access_control={'team-a': 
{permission}}
+                )
             self.assertIn("invalid permissions", str(context.exception))
 
     def test_access_control_is_set_on_init(self):
-        self.expect_user_is_in_role(self.user, rolename='team-a')
-        self.security_manager.sync_perm_for_dag(
-            'access_control_test',
-            access_control={
-                'team-a': ['can_dag_edit', 'can_dag_read']
-            })
-        self.assert_user_has_dag_perms(
-            perms=['can_dag_edit', 'can_dag_read'],
-            dag_id='access_control_test',
-        )
+        username = 'access_control_is_set_on_init'
+        role_name = 'team-a'
+        with self.app.app_context():
+            user = fab_utils.create_user(self.app, username, role_name, 
permissions=[],)
+            self.expect_user_is_in_role(user, rolename='team-a')
+            self.security_manager.sync_perm_for_dag(
+                'access_control_test', access_control={'team-a': ['can_edit', 
'can_read']}
+            )
+            self.assert_user_has_dag_perms(
+                perms=['can_edit', 'can_read'], dag_id='access_control_test', 
user=user
+            )
 
-        self.expect_user_is_in_role(self.user, rolename='NOT-team-a')
-        self.assert_user_does_not_have_dag_perms(
-            perms=['can_dag_edit', 'can_dag_read'],
-            dag_id='access_control_test',
-        )
+            self.expect_user_is_in_role(user, rolename='NOT-team-a')
+            self.assert_user_does_not_have_dag_perms(
+                perms=['can_edit', 'can_read'], dag_id='access_control_test', 
user=user
+            )
 
     def test_access_control_stale_perms_are_revoked(self):
-        self.expect_user_is_in_role(self.user, rolename='team-a')
-        self.security_manager.sync_perm_for_dag(
-            'access_control_test',
-            access_control={'team-a': READ_WRITE})
-        self.assert_user_has_dag_perms(
-            perms=READ_WRITE,
-            dag_id='access_control_test',
-        )
+        username = 'access_control_stale_perms_are_revoked'
+        role_name = 'team-a'
+        with self.app.app_context():
+            user = fab_utils.create_user(self.app, username, role_name, 
permissions=[],)
+            self.expect_user_is_in_role(user, rolename='team-a')
+            self.security_manager.sync_perm_for_dag(
+                'access_control_test', access_control={'team-a': READ_WRITE}
+            )
+            self.assert_user_has_dag_perms(perms=READ_WRITE, 
dag_id='access_control_test', user=user)
 
-        self.security_manager.sync_perm_for_dag(
-            'access_control_test',
-            access_control={'team-a': READ_ONLY})
-        self.assert_user_has_dag_perms(
-            perms=['can_dag_read'],
-            dag_id='access_control_test',
-        )
-        self.assert_user_does_not_have_dag_perms(
-            perms=['can_dag_edit'],
-            dag_id='access_control_test',
-        )
+            self.security_manager.sync_perm_for_dag(
+                'access_control_test', access_control={'team-a': READ_ONLY}
+            )
+            self.assert_user_has_dag_perms(perms=['can_read'], 
dag_id='access_control_test', user=user)
+            self.assert_user_does_not_have_dag_perms(
+                perms=['can_edit'], dag_id='access_control_test', user=user
+            )
 
     def test_no_additional_dag_permission_views_created(self):
         ab_perm_view_role = sqla_models.assoc_permissionview_role
diff --git a/tests/www/test_views.py b/tests/www/test_views.py
index bb6aeb6..e66b3d8 100644
--- a/tests/www/test_views.py
+++ b/tests/www/test_views.py
@@ -52,6 +52,7 @@ from airflow.models.renderedtifields import 
RenderedTaskInstanceFields as RTIF
 from airflow.models.serialized_dag import SerializedDagModel
 from airflow.operators.bash import BashOperator
 from airflow.operators.dummy_operator import DummyOperator
+from airflow.security import permissions
 from airflow.ti_deps.dependencies_states import QUEUEABLE_STATES, 
RUNNABLE_STATES
 from airflow.utils import dates, timezone
 from airflow.utils.log.logging_mixin import ExternalLoggingMixin
@@ -141,7 +142,6 @@ class TestBase(unittest.TestCase):
                 role=self.appbuilder.sm.find_role('Admin'),
                 password='test',
             )
-
         if username == 'test_user' and not 
self.appbuilder.sm.find_user(username='test_user'):
             self.appbuilder.sm.add_user(
                 username='test_user',
@@ -1649,6 +1649,10 @@ class TestDagACLView(TestBase):
         super().setUpClass()
         dagbag = models.DagBag(include_examples=True)
         DAG.bulk_write_to_db(dagbag.dags.values())
+        for username in ['all_dag_user', 'dag_read_only', 'dag_faker', 
'dag_tester']:
+            user = cls.appbuilder.sm.find_user(username=username)
+            if user:
+                cls.appbuilder.sm.del_register_user(user)
 
     def prepare_dagruns(self):
         dagbag = models.DagBag(include_examples=True)
@@ -1729,21 +1733,28 @@ class TestDagACLView(TestBase):
         self.logout()
         self.login(username='test',
                    password='test')
-        perm_on_dag = self.appbuilder.sm.\
-            find_permission_view_menu('can_dag_edit', 'example_bash_operator')
         dag_tester_role = self.appbuilder.sm.find_role('dag_acl_tester')
-        self.appbuilder.sm.add_permission_role(dag_tester_role, perm_on_dag)
+        edit_perm_on_dag = self.appbuilder.sm.\
+            find_permission_view_menu('can_edit', 'DAG:example_bash_operator')
+        self.appbuilder.sm.add_permission_role(dag_tester_role, 
edit_perm_on_dag)
+        read_perm_on_dag = self.appbuilder.sm.\
+            find_permission_view_menu('can_read', 'DAG:example_bash_operator')
+        self.appbuilder.sm.add_permission_role(dag_tester_role, 
read_perm_on_dag)
 
-        perm_on_all_dag = self.appbuilder.sm.\
-            find_permission_view_menu('can_dag_edit', 'all_dags')
         all_dag_role = self.appbuilder.sm.find_role('all_dag_role')
-        self.appbuilder.sm.add_permission_role(all_dag_role, perm_on_all_dag)
+        edit_perm_on_all_dag = self.appbuilder.sm.\
+            find_permission_view_menu('can_edit', permissions.RESOURCE_DAGS)
+        self.appbuilder.sm.add_permission_role(all_dag_role, 
edit_perm_on_all_dag)
+        read_perm_on_all_dag = self.appbuilder.sm.\
+            find_permission_view_menu('can_read', permissions.RESOURCE_DAGS)
+        self.appbuilder.sm.add_permission_role(all_dag_role, 
read_perm_on_all_dag)
 
         role_user = self.appbuilder.sm.find_role('User')
-        self.appbuilder.sm.add_permission_role(role_user, perm_on_all_dag)
+        self.appbuilder.sm.add_permission_role(role_user, read_perm_on_all_dag)
+        self.appbuilder.sm.add_permission_role(role_user, edit_perm_on_all_dag)
 
         read_only_perm_on_dag = self.appbuilder.sm.\
-            find_permission_view_menu('can_dag_read', 'example_bash_operator')
+            find_permission_view_menu('can_read', 'DAG:example_bash_operator')
         dag_read_only_role = self.appbuilder.sm.find_role('dag_acl_read_only')
         self.appbuilder.sm.add_permission_role(dag_read_only_role, 
read_only_perm_on_dag)
 
@@ -1751,19 +1762,17 @@ class TestDagACLView(TestBase):
         self.logout()
         self.login(username='test',
                    password='test')
-        test_view_menu = 
self.appbuilder.sm.find_view_menu('example_bash_operator')
+        test_view_menu = 
self.appbuilder.sm.find_view_menu('DAG:example_bash_operator')
         perms_views = 
self.appbuilder.sm.find_permissions_view_menu(test_view_menu)
-        self.assertEqual(len(perms_views), 4)
-
-        permissions = [str(perm) for perm in perms_views]
-        expected_permissions = [
-            'can read on example_bash_operator',
-            'can dag edit on example_bash_operator',
-            'can dag read on example_bash_operator',
-            'can edit on example_bash_operator',
+        self.assertEqual(len(perms_views), 2)
+
+        perms = [str(perm) for perm in perms_views]
+        expected_perms = [
+            'can read on DAG:example_bash_operator',
+            'can edit on DAG:example_bash_operator',
         ]
-        for perm in expected_permissions:
-            self.assertIn(perm, permissions)
+        for perm in expected_perms:
+            self.assertIn(perm, perms)
 
     def test_role_permission_associate(self):
         self.logout()
@@ -1771,8 +1780,8 @@ class TestDagACLView(TestBase):
                    password='test')
         test_role = self.appbuilder.sm.find_role('dag_acl_tester')
         perms = {str(perm) for perm in test_role.permissions}
-        self.assertIn('can dag edit on example_bash_operator', perms)
-        self.assertNotIn('can dag read on example_bash_operator', perms)
+        self.assertIn('can edit on DAG:example_bash_operator', perms)
+        self.assertIn('can read on DAG:example_bash_operator', perms)
 
     def test_index_success(self):
         self.logout()
@@ -2175,7 +2184,7 @@ class TestDagACLView(TestBase):
         self.check_content_not_in_response('example_bash_operator', resp)
 
     def test_success_fail_for_read_only_role(self):
-        # success endpoint need can_dag_edit, which read only role can not 
access
+        # success endpoint need can_edit, which read only role can not access
         self.logout()
         self.login(username='dag_read_only',
                    password='dag_read_only')
@@ -2193,7 +2202,7 @@ class TestDagACLView(TestBase):
         self.check_content_not_in_response('Wait a minute', resp, 
resp_code=302)
 
     def test_tree_success_for_read_only_role(self):
-        # tree view only allows can_dag_read, which read only role could access
+        # tree view only allows can_read, which read only role could access
         self.logout()
         self.login(username='dag_read_only',
                    password='dag_read_only')

Reply via email to