aminghadersohi commented on code in PR #38407: URL: https://github.com/apache/superset/pull/38407#discussion_r2895039626
########## tests/unit_tests/mcp_service/test_auth_rbac.py: ########## @@ -0,0 +1,221 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""Tests for MCP RBAC permission checking (auth.py).""" + +from unittest.mock import MagicMock, patch + +import pytest +from flask import g + +from superset.mcp_service.auth import ( + check_tool_permission, + CLASS_PERMISSION_ATTR, + MCPPermissionDeniedError, + METHOD_PERMISSION_ATTR, + PERMISSION_PREFIX, +) + + [email protected](autouse=True) +def enable_mcp_rbac(app): + """Re-enable RBAC for dedicated RBAC tests. + + The shared conftest disables RBAC for integration tests. This fixture + overrides that so we can test the actual permission checking logic. + """ + app.config["MCP_RBAC_ENABLED"] = True + yield + app.config.pop("MCP_RBAC_ENABLED", None) + + +class _ToolFunc: + """Minimal callable stand-in for a tool function in tests. + + Unlike MagicMock, this does NOT auto-create attributes on access, + so ``getattr(func, ATTR, None)`` properly returns None when the + attribute hasn't been set. + """ + + def __init__(self, name: str = "test_tool"): + self.__name__ = name + + def __call__(self, *args, **kwargs): + pass + + +def _make_tool_func( + class_perm: str | None = None, + method_perm: str | None = None, +) -> _ToolFunc: + """Create a tool function stub with optional permission attributes.""" + func = _ToolFunc() + if class_perm is not None: + setattr(func, CLASS_PERMISSION_ATTR, class_perm) + if method_perm is not None: + setattr(func, METHOD_PERMISSION_ATTR, method_perm) + return func + + +# -- MCPPermissionDeniedError -- + + +def test_permission_denied_error_message_basic(): + err = MCPPermissionDeniedError( + permission_name="can_read", + view_name="Chart", + ) + assert "can_read" in str(err) + assert "Chart" in str(err) + assert err.permission_name == "can_read" + assert err.view_name == "Chart" + + +def test_permission_denied_error_message_with_user_and_tool(): + err = MCPPermissionDeniedError( + permission_name="can_write", + view_name="Dashboard", + user="alice", + tool_name="generate_dashboard", + ) + assert "alice" in str(err) + assert "generate_dashboard" in str(err) + assert "Dashboard" in str(err) + + +# -- check_tool_permission -- + + +def test_check_tool_permission_no_class_permission_allows(app_context): + """Tools without class_permission_name should be allowed by default.""" + g.user = MagicMock(username="admin") + func = _make_tool_func() # no class_permission_name + assert check_tool_permission(func) is True + + +def test_check_tool_permission_no_user_denies(app_context): + """If no g.user, permission check should deny.""" + g.user = None + func = _make_tool_func(class_perm="Chart") + assert check_tool_permission(func) is False + + +def test_check_tool_permission_granted(app_context): + """When security_manager.can_access returns True, permission is granted.""" + g.user = MagicMock(username="admin") + func = _make_tool_func(class_perm="Chart", method_perm="read") + + with patch("superset.extensions.appbuilder") as mock_appbuilder: + mock_appbuilder.sm.can_access.return_value = True + result = check_tool_permission(func) + + assert result is True + mock_appbuilder.sm.can_access.assert_called_once_with("can_read", "Chart") + + +def test_check_tool_permission_denied(app_context): + """When security_manager.can_access returns False, permission is denied.""" + g.user = MagicMock(username="viewer") + func = _make_tool_func(class_perm="Dashboard", method_perm="write") + + with patch("superset.extensions.appbuilder") as mock_appbuilder: + mock_appbuilder.sm.can_access.return_value = False + result = check_tool_permission(func) + + assert result is False + mock_appbuilder.sm.can_access.assert_called_once_with("can_write", "Dashboard") + + +def test_check_tool_permission_default_method_is_read(app_context): + """When no method_permission_name is set, defaults to 'read'.""" + g.user = MagicMock(username="admin") + func = _make_tool_func(class_perm="Dataset") + # No method_perm set - should default to "read" + + with patch("superset.extensions.appbuilder") as mock_appbuilder: + mock_appbuilder.sm.can_access.return_value = True + check_tool_permission(func) Review Comment: Thanks — I'll add the return value assertion. ########## superset/mcp_service/chart/tool/update_chart_preview.py: ########## @@ -44,7 +44,7 @@ logger = logging.getLogger(__name__) -@tool(tags=["mutate"]) +@tool(tags=["mutate"], class_permission_name="Chart") Review Comment: Understood, no worries. ########## superset/mcp_service/auth.py: ########## @@ -42,6 +43,90 @@ logger = logging.getLogger(__name__) +# Constants for RBAC permission attributes (mirrors FAB conventions) +PERMISSION_PREFIX = "can_" +CLASS_PERMISSION_ATTR = "_class_permission_name" +METHOD_PERMISSION_ATTR = "_method_permission_name" + + +class MCPPermissionDeniedError(Exception): + """Raised when user lacks required RBAC permission for an MCP tool.""" + + def __init__( + self, + permission_name: str, + view_name: str, + user: str | None = None, + tool_name: str | None = None, + ): + self.permission_name = permission_name + self.view_name = view_name + self.user = user + self.tool_name = tool_name + message = ( + f"Permission denied: {permission_name} on {view_name}" + + (f" for user {user}" if user else "") + + (f" (tool: {tool_name})" if tool_name else "") + ) + super().__init__(message) + + +def check_tool_permission(func: Callable[..., Any]) -> bool: + """Check if the current user has RBAC permission for an MCP tool. + + Reads permission metadata stored on the function by the @tool decorator + and uses Superset's security_manager to verify access. + + Controlled by the ``MCP_RBAC_ENABLED`` config flag (default True). + Set to False in superset_config.py to disable RBAC checking. + + Args: + func: The tool function with optional permission attributes. + + Returns: + True if user has permission or no permission is required. + """ + try: + from flask import current_app + + if not current_app.config.get("MCP_RBAC_ENABLED", True): + return True + + from superset import security_manager + + if not hasattr(g, "user") or not g.user: + logger.warning( + "No user context for permission check on tool: %s", func.__name__ + ) + return False + + class_permission_name = getattr(func, CLASS_PERMISSION_ATTR, None) + if not class_permission_name: + # No RBAC configured for this tool; allow by default. + return True + + method_permission_name = getattr(func, METHOD_PERMISSION_ATTR, "read") + permission_str = f"{PERMISSION_PREFIX}{method_permission_name}" + + has_permission = security_manager.can_access( + permission_str, class_permission_name + ) + + if not has_permission: + logger.warning( + "Permission denied for user %s: %s on %s (tool: %s)", + g.user.username, + permission_str, + class_permission_name, + func.__name__, + ) + + return has_permission + + except Exception as e: Review Comment: Thanks for the code snippet — I'll narrow the exception handling to (AttributeError, ValueError) as suggested. ########## superset/mcp_service/mcp_config.py: ########## @@ -45,6 +45,10 @@ # MCP Debug mode - shows suppressed initialization output in stdio mode MCP_DEBUG = False +# MCP RBAC - when True, tools with class_permission_name are checked +# against the FAB security_manager before execution. +MCP_RBAC_ENABLED = True Review Comment: Thanks for confirming — I'll add MCP_RBAC_ENABLED to the defaults. ########## superset/mcp_service/auth.py: ########## @@ -294,6 +391,17 @@ def sync_wrapper(*args: Any, **kwargs: Any) -> Any: ) return tool_func(*args, **kwargs) + # RBAC permission check + if not check_tool_permission(tool_func): + raise MCPPermissionDeniedError( + permission_name=getattr( + tool_func, METHOD_PERMISSION_ATTR, "read" + ), Review Comment: Thanks for confirming the fix. Using the full FAB permission string matches what security_manager expects. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
