This is an automated email from the ASF dual-hosted git repository. ephraimanierobi pushed a commit to branch v2-3-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit d7b58db1588474b287669dc58ae2ca72bd35e139 Author: Ephraim Anierobi <splendidzig...@gmail.com> AuthorDate: Wed Jun 8 08:47:26 2022 +0100 Fix permission issue for dag that has dot in name (#23510) How we determine if a DAG is a subdag in airflow.security.permissions.resource_name_for_dag is not right. If a dag_id contains a dot, the permission is not recorded correctly. The current solution makes a query every time we check for permission for dags that has a dot in the name. Not that I like it but I think it's better than other options I considered such as changing how we name dags for subdag. That's not good in UX. Another option I considered was making a query when parsing, that's not good and it's avoided by passing root_dag to resource_name_for_dag Co-authored-by: Ash Berlin-Taylor <ash_git...@firemirror.com> Co-authored-by: Tzu-ping Chung <uranu...@gmail.com> (cherry picked from commit cc35fcaf89eeff3d89e18088c2e68f01f8baad56) --- airflow/models/dagbag.py | 8 +++-- airflow/security/permissions.py | 19 ++++++------ airflow/www/security.py | 29 ++++++++++++++---- tests/www/test_security.py | 66 +++++++++++++++++++++++++++++++++++------ 4 files changed, 95 insertions(+), 27 deletions(-) diff --git a/airflow/models/dagbag.py b/airflow/models/dagbag.py index c0ef0941b6..929842fd0d 100644 --- a/airflow/models/dagbag.py +++ b/airflow/models/dagbag.py @@ -641,6 +641,8 @@ class DagBag(LoggingMixin): from airflow.security.permissions import DAG_ACTIONS, resource_name_for_dag from airflow.www.fab_security.sqla.models import Action, Permission, Resource + root_dag_id = dag.parent_dag.dag_id if dag.parent_dag else dag.dag_id + def needs_perms(dag_id: str) -> bool: dag_resource_name = resource_name_for_dag(dag_id) for permission_name in DAG_ACTIONS: @@ -655,9 +657,9 @@ class DagBag(LoggingMixin): return True return False - if dag.access_control or needs_perms(dag.dag_id): - self.log.debug("Syncing DAG permissions: %s to the DB", dag.dag_id) + if dag.access_control or needs_perms(root_dag_id): + self.log.debug("Syncing DAG permissions: %s to the DB", root_dag_id) from airflow.www.security import ApplessAirflowSecurityManager security_manager = ApplessAirflowSecurityManager(session=session) - security_manager.sync_perm_for_dag(dag.dag_id, dag.access_control) + security_manager.sync_perm_for_dag(root_dag_id, dag.access_control) diff --git a/airflow/security/permissions.py b/airflow/security/permissions.py index 2d5c0b9399..2d02c773b4 100644 --- a/airflow/security/permissions.py +++ b/airflow/security/permissions.py @@ -66,14 +66,15 @@ DEPRECATED_ACTION_CAN_DAG_EDIT = "can_dag_edit" DAG_ACTIONS = {ACTION_CAN_READ, ACTION_CAN_EDIT, ACTION_CAN_DELETE} -def resource_name_for_dag(dag_id): - """Returns the resource name for a DAG id.""" - if dag_id == RESOURCE_DAG: - return dag_id +def resource_name_for_dag(root_dag_id: str) -> str: + """Returns the resource name for a DAG id. - if dag_id.startswith(RESOURCE_DAG_PREFIX): - return dag_id - - # To account for SubDags - root_dag_id = dag_id.split(".")[0] + Note that since a sub-DAG should follow the permission of its + parent DAG, you should pass ``DagModel.root_dag_id`` to this function, + for a subdag. A normal dag should pass the ``DagModel.dag_id``. + """ + if root_dag_id == RESOURCE_DAG: + return root_dag_id + if root_dag_id.startswith(RESOURCE_DAG_PREFIX): + return root_dag_id return f"{RESOURCE_DAG_PREFIX}{root_dag_id}" diff --git a/airflow/www/security.py b/airflow/www/security.py index 42188f0618..de6b0d646e 100644 --- a/airflow/www/security.py +++ b/airflow/www/security.py @@ -200,6 +200,16 @@ class AirflowSecurityManager(SecurityManager, LoggingMixin): view.datamodel = CustomSQLAInterface(view.datamodel.obj) self.perms = None + def _get_root_dag_id(self, dag_id): + if '.' in dag_id: + dm = ( + self.get_session.query(DagModel.dag_id, DagModel.root_dag_id) + .filter(DagModel.dag_id == dag_id) + .first() + ) + return dm.root_dag_id or dm.dag_id + return dag_id + def init_role(self, role_name, perms): """ Initialize the role with actions and related resources. @@ -340,7 +350,8 @@ class AirflowSecurityManager(SecurityManager, LoggingMixin): def can_access_some_dags(self, action: str, dag_id: Optional[str] = None) -> bool: """Checks if user has read or write access to some dags.""" if dag_id and dag_id != '~': - return self.has_access(action, permissions.resource_name_for_dag(dag_id)) + root_dag_id = self._get_root_dag_id(dag_id) + return self.has_access(action, permissions.resource_name_for_dag(root_dag_id)) user = g.user if action == permissions.ACTION_CAN_READ: @@ -349,17 +360,20 @@ class AirflowSecurityManager(SecurityManager, LoggingMixin): def can_read_dag(self, dag_id, user=None) -> bool: """Determines whether a user has DAG read access.""" - dag_resource_name = permissions.resource_name_for_dag(dag_id) + root_dag_id = self._get_root_dag_id(dag_id) + dag_resource_name = permissions.resource_name_for_dag(root_dag_id) return self.has_access(permissions.ACTION_CAN_READ, dag_resource_name, user=user) def can_edit_dag(self, dag_id, user=None) -> bool: """Determines whether a user has DAG edit access.""" - dag_resource_name = permissions.resource_name_for_dag(dag_id) + root_dag_id = self._get_root_dag_id(dag_id) + dag_resource_name = permissions.resource_name_for_dag(root_dag_id) return self.has_access(permissions.ACTION_CAN_EDIT, dag_resource_name, user=user) def can_delete_dag(self, dag_id, user=None) -> bool: """Determines whether a user has DAG delete access.""" - dag_resource_name = permissions.resource_name_for_dag(dag_id) + root_dag_id = self._get_root_dag_id(dag_id) + dag_resource_name = permissions.resource_name_for_dag(root_dag_id) return self.has_access(permissions.ACTION_CAN_DELETE, dag_resource_name, user=user) def prefixed_dag_id(self, dag_id): @@ -370,7 +384,8 @@ class AirflowSecurityManager(SecurityManager, LoggingMixin): DeprecationWarning, stacklevel=2, ) - return permissions.resource_name_for_dag(dag_id) + root_dag_id = self._get_root_dag_id(dag_id) + return permissions.resource_name_for_dag(root_dag_id) def is_dag_resource(self, resource_name): """Determines if a resource belongs to a DAG or all DAGs.""" @@ -530,7 +545,8 @@ class AirflowSecurityManager(SecurityManager, LoggingMixin): dags = dagbag.dags.values() for dag in dags: - dag_resource_name = permissions.resource_name_for_dag(dag.dag_id) + root_dag_id = dag.parent_dag.dag_id if dag.parent_dag else dag.dag_id + dag_resource_name = permissions.resource_name_for_dag(root_dag_id) for action_name in self.DAG_ACTIONS: if (action_name, dag_resource_name) not in perms: self._merge_perm(action_name, dag_resource_name) @@ -615,6 +631,7 @@ class AirflowSecurityManager(SecurityManager, LoggingMixin): :param access_control: a dict where each key is a rolename and each value is a set() of action names (e.g. {'can_read'}) """ + dag_resource_name = permissions.resource_name_for_dag(dag_id) def _get_or_create_dag_permission(action_name: str) -> Optional[Permission]: diff --git a/tests/www/test_security.py b/tests/www/test_security.py index 8c90062600..7b8541ca81 100644 --- a/tests/www/test_security.py +++ b/tests/www/test_security.py @@ -192,7 +192,8 @@ def sample_dags(security_manager): @pytest.fixture(scope="module") def has_dag_perm(security_manager): def _has_dag_perm(perm, dag_id, user): - return security_manager.has_access(perm, permissions.resource_name_for_dag(dag_id), user) + root_dag_id = security_manager._get_root_dag_id(dag_id) + return security_manager.has_access(perm, permissions.resource_name_for_dag(root_dag_id), user) return _has_dag_perm @@ -351,7 +352,7 @@ def test_verify_anon_user_with_admin_role_has_access_to_each_dag( user.roles = security_manager.get_user_roles(user) assert user.roles == {security_manager.get_public_role()} - test_dag_ids = ["test_dag_id_1", "test_dag_id_2", "test_dag_id_3"] + test_dag_ids = ["test_dag_id_1", "test_dag_id_2", "test_dag_id_3", "test_dag_id_4.with_dot"] for dag_id in test_dag_ids: with _create_dag_model_context(dag_id, session, security_manager): @@ -588,7 +589,8 @@ def test_access_control_with_invalid_permission(app, security_manager): for action in invalid_actions: with pytest.raises(AirflowException) as ctx: security_manager._sync_dag_view_permissions( - 'access_control_test', access_control={rolename: {action}} + 'access_control_test', + access_control={rolename: {action}}, ) assert "invalid permissions" in str(ctx.value) @@ -728,11 +730,13 @@ def test_create_dag_specific_permissions(session, security_manager, monkeypatch, assert ('can_edit', dag_resource_name) in all_perms security_manager._sync_dag_view_permissions.assert_called_once_with( - permissions.resource_name_for_dag('has_access_control'), access_control + permissions.resource_name_for_dag('has_access_control'), + access_control, ) del dagbag_mock.dags["has_access_control"] - with assert_queries_count(1): # one query to get all perms; dagbag is mocked + with assert_queries_count(2): # two query to get all perms; dagbag is mocked + # The extra query happens at permission check security_manager.create_dag_specific_permissions() @@ -782,10 +786,12 @@ def test_prefixed_dag_id_is_deprecated(security_manager): security_manager.prefixed_dag_id("hello") -def test_parent_dag_access_applies_to_subdag(app, security_manager, assert_user_has_dag_perms): +def test_parent_dag_access_applies_to_subdag(app, security_manager, assert_user_has_dag_perms, session): username = 'dag_permission_user' role_name = 'dag_permission_role' parent_dag_name = "parent_dag" + subdag_name = parent_dag_name + ".subdag" + subsubdag_name = parent_dag_name + ".subdag.subsubdag" with app.app_context(): mock_roles = [ { @@ -801,15 +807,57 @@ def test_parent_dag_access_applies_to_subdag(app, security_manager, assert_user_ username=username, role_name=role_name, ) as user: + dag1 = DagModel(dag_id=parent_dag_name) + dag2 = DagModel(dag_id=subdag_name, is_subdag=True, root_dag_id=parent_dag_name) + dag3 = DagModel(dag_id=subsubdag_name, is_subdag=True, root_dag_id=parent_dag_name) + session.add_all([dag1, dag2, dag3]) + session.commit() security_manager.bulk_sync_roles(mock_roles) - security_manager._sync_dag_view_permissions( - parent_dag_name, access_control={role_name: READ_WRITE} - ) + for dag in [dag1, dag2, dag3]: + security_manager._sync_dag_view_permissions( + parent_dag_name, access_control={role_name: READ_WRITE} + ) + assert_user_has_dag_perms(perms=READ_WRITE, dag_id=parent_dag_name, user=user) assert_user_has_dag_perms(perms=READ_WRITE, dag_id=parent_dag_name + ".subdag", user=user) assert_user_has_dag_perms( perms=READ_WRITE, dag_id=parent_dag_name + ".subdag.subsubdag", user=user ) + session.query(DagModel).delete() + + +def test_permissions_work_for_dags_with_dot_in_dagname( + app, security_manager, assert_user_has_dag_perms, assert_user_does_not_have_dag_perms, session +): + username = 'dag_permission_user' + role_name = 'dag_permission_role' + dag_id = "dag_id_1" + dag_id_2 = "dag_id_1.with_dot" + with app.app_context(): + mock_roles = [ + { + 'role': role_name, + 'perms': [ + (permissions.ACTION_CAN_READ, f"DAG:{dag_id}"), + (permissions.ACTION_CAN_EDIT, f"DAG:{dag_id}"), + ], + } + ] + with create_user_scope( + app, + username=username, + role_name=role_name, + ) as user: + dag1 = DagModel(dag_id=dag_id) + dag2 = DagModel(dag_id=dag_id_2) + session.add_all([dag1, dag2]) + session.commit() + security_manager.bulk_sync_roles(mock_roles) + security_manager.sync_perm_for_dag(dag1.dag_id, access_control={role_name: READ_WRITE}) + security_manager.sync_perm_for_dag(dag2.dag_id, access_control={role_name: READ_WRITE}) + assert_user_has_dag_perms(perms=READ_WRITE, dag_id=dag_id, user=user) + assert_user_does_not_have_dag_perms(perms=READ_WRITE, dag_id=dag_id_2, user=user) + session.query(DagModel).delete() def test_fab_models_use_airflow_base_meta():