Jorricks commented on a change in pull request #16634:
URL: https://github.com/apache/airflow/pull/16634#discussion_r711362982



##########
File path: airflow/www/views.py
##########
@@ -3145,6 +3156,65 @@ class AirflowModelView(ModelView):
     CustomSQLAInterface = wwwutils.CustomSQLAInterface
 
 
+class AirflowPrivilegeVerifierModelView(AirflowModelView):
+    """
+    This ModelView prevents ability to pass primary keys of objects relating 
to DAGs you shouldn't be able to
+    edit. This only holds for the add, update and delete operations.
+    You will still need to use the `action_has_dag_edit_access()` for actions.
+    """
+
+    @staticmethod
+    def validate_dag_edit_access(item: Union[DagRun, TaskInstance]):
+        """Validates whether the user has 'can_edit' access for this specific 
DAG."""
+        if not current_app.appbuilder.sm.can_edit_dag(item.dag_id):
+            raise AirflowException(f"Access denied for dag_id {item.dag_id}")
+
+    def pre_add(self, item: Union[DagRun, TaskInstance]):
+        self.validate_dag_edit_access(item)
+
+    def pre_update(self, item: Union[DagRun, TaskInstance]):
+        self.validate_dag_edit_access(item)
+
+    def pre_delete(self, item: Union[DagRun, TaskInstance]):
+        self.validate_dag_edit_access(item)
+
+    def post_add_redirect(self):  # Required to prevent redirect loop
+        return redirect(self.get_default_url())
+
+    def post_edit_redirect(self):  # Required to prevent redirect loop
+        return redirect(self.get_default_url())
+
+    def post_delete_redirect(self):  # Required to prevent redirect loop
+        return redirect(self.get_default_url())
+
+
+def action_has_dag_edit_access(action_func: Callable) -> Callable:
+    """Decorator for actions which verifies you have DAG edit access on the 
given tis/drs."""
+
+    @wraps(action_func)
+    def check_dag_edit_acl_for_actions(
+        self,
+        items: Optional[Union[List[TaskInstance], List[DagRun], TaskInstance, 
DagRun, None]],
+        *args,
+        **kwargs,
+    ) -> None:
+        if items is None:
+            dag_ids: Set[str] = set()
+        elif isinstance(items, list):
+            dag_ids = {item.dag_id for item in items if item is not None}
+        else:
+            dag_ids = items.dag_id
+
+        for dag_id in dag_ids:

Review comment:
       Good catch! The last else should have been a set instead of a singular 
item. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to