github-advanced-security[bot] commented on code in PR #35121: URL: https://github.com/apache/superset/pull/35121#discussion_r2344689372
########## superset/mcp_service/chart/schemas.py: ########## @@ -0,0 +1,1078 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Pydantic schemas for chart-related responses +""" + +from __future__ import annotations + +import html +import re +from datetime import datetime +from typing import Annotated, Any, Dict, List, Literal, Optional, Protocol + +from pydantic import ( + BaseModel, + ConfigDict, + Field, + field_validator, + model_validator, + PositiveInt, +) + +from superset.daos.base import ColumnOperator, ColumnOperatorEnum +from superset.mcp_service.common.cache_schemas import ( + CacheStatus, + FormDataCacheControl, + MetadataCacheControl, + QueryCacheControl, +) +from superset.mcp_service.system.schemas import ( + PaginationInfo, + TagInfo, + UserInfo, +) + + +class ChartLike(Protocol): + """Protocol for chart-like objects with expected attributes.""" + + id: int + slice_name: Optional[str] + viz_type: Optional[str] + datasource_name: Optional[str] + datasource_type: Optional[str] + url: Optional[str] + description: Optional[str] + cache_timeout: Optional[int] + form_data: Optional[Dict[str, Any]] + query_context: Optional[Any] + changed_by: Optional[Any] # User object + changed_by_name: Optional[str] + changed_on: Optional[str | datetime] + changed_on_humanized: Optional[str] + created_by: Optional[Any] # User object + created_by_name: Optional[str] + created_on: Optional[str | datetime] + created_on_humanized: Optional[str] + uuid: Optional[str] + tags: Optional[List[Any]] + owners: Optional[List[Any]] + + +class ChartInfo(BaseModel): + """Full chart model with all possible attributes.""" + + id: int = Field(..., description="Chart ID") + slice_name: str = Field(..., description="Chart name") + viz_type: Optional[str] = Field(None, description="Visualization type") + datasource_name: Optional[str] = Field(None, description="Datasource name") + datasource_type: Optional[str] = Field(None, description="Datasource type") + url: Optional[str] = Field(None, description="Chart URL") + description: Optional[str] = Field(None, description="Chart description") + cache_timeout: Optional[int] = Field(None, description="Cache timeout") + form_data: Optional[Dict[str, Any]] = Field(None, description="Chart form data") + query_context: Optional[Any] = Field(None, description="Query context") + changed_by: Optional[str] = Field(None, description="Last modifier (username)") + changed_by_name: Optional[str] = Field( + None, description="Last modifier (display name)" + ) + changed_on: Optional[str | datetime] = Field( + None, description="Last modification timestamp" + ) + changed_on_humanized: Optional[str] = Field( + None, description="Humanized modification time" + ) + created_by: Optional[str] = Field(None, description="Chart creator (username)") + created_on: Optional[str | datetime] = Field(None, description="Creation timestamp") + created_on_humanized: Optional[str] = Field( + None, description="Humanized creation time" + ) + uuid: Optional[str] = Field(None, description="Chart UUID") + tags: List[TagInfo] = Field(default_factory=list, description="Chart tags") + owners: List[UserInfo] = Field(default_factory=list, description="Chart owners") + model_config = ConfigDict(from_attributes=True, ser_json_timedelta="iso8601") + + +class GetChartAvailableFiltersRequest(BaseModel): + """ + Request schema for get_chart_available_filters tool. + + Currently has no parameters but provides consistent API for future extensibility. + """ + + model_config = ConfigDict( + extra="forbid", + str_strip_whitespace=True, + ) + + +class ChartAvailableFiltersResponse(BaseModel): + column_operators: Dict[str, Any] = Field( + ..., description="Available filter operators and metadata for each column" + ) + + +class ChartError(BaseModel): + error: str = Field(..., description="Error message") + error_type: str = Field(..., description="Type of error") + timestamp: Optional[str | datetime] = Field(None, description="Error timestamp") + model_config = ConfigDict(ser_json_timedelta="iso8601") + + @classmethod + def create(cls, error: str, error_type: str) -> "ChartError": + """Create a standardized ChartError with timestamp.""" + from datetime import datetime + + return cls(error=error, error_type=error_type, timestamp=datetime.now()) + + +class ChartCapabilities(BaseModel): + """Describes what the chart can do for LLM understanding.""" + + supports_interaction: bool = Field(description="Chart supports user interaction") + supports_real_time: bool = Field(description="Chart supports live data updates") + supports_drill_down: bool = Field( + description="Chart supports drill-down navigation" + ) + supports_export: bool = Field(description="Chart can be exported to other formats") + optimal_formats: List[str] = Field(description="Recommended preview formats") + data_types: List[str] = Field( + description="Types of data shown (time_series, categorical, etc)" + ) + + +class ChartSemantics(BaseModel): + """Semantic information for LLM reasoning.""" + + primary_insight: str = Field( + description="Main insight or pattern the chart reveals" + ) + data_story: str = Field(description="Narrative description of what the data shows") + recommended_actions: List[str] = Field( + description="Suggested next steps based on data" + ) + anomalies: List[str] = Field(description="Notable outliers or unusual patterns") + statistical_summary: Dict[str, Any] = Field( + description="Key statistics (mean, median, trends)" + ) + + +class PerformanceMetadata(BaseModel): + """Performance information for LLM cost understanding.""" + + query_duration_ms: int = Field(description="Query execution time") + estimated_cost: Optional[str] = Field(None, description="Resource cost estimate") + cache_status: str = Field(description="Cache hit/miss status") + optimization_suggestions: List[str] = Field( + default_factory=list, description="Performance improvement tips" + ) + + +class AccessibilityMetadata(BaseModel): + """Accessibility information for inclusive visualization.""" + + color_blind_safe: bool = Field(description="Uses colorblind-safe palette") + alt_text: str = Field(description="Screen reader description") + high_contrast_available: bool = Field(description="High contrast version available") + + +class VersionedResponse(BaseModel): + """Base class for versioned API responses.""" + + schema_version: str = Field("2.0", description="Response schema version") + api_version: str = Field("v1", description="MCP API version") + + +class GetChartInfoRequest(BaseModel): + """Request schema for get_chart_info with support for ID or UUID.""" + + identifier: Annotated[ + int | str, + Field(description="Chart identifier - can be numeric ID or UUID string"), + ] + + +def serialize_chart_object(chart: ChartLike | None) -> ChartInfo | None: + if not chart: + return None + + # Generate MCP service screenshot URL instead of chart's native URL + from superset.mcp_service.utils.url_utils import get_chart_screenshot_url + + chart_id = getattr(chart, "id", None) + screenshot_url = None + if chart_id: + screenshot_url = get_chart_screenshot_url(chart_id) + + return ChartInfo( + id=chart_id, + slice_name=getattr(chart, "slice_name", None), + viz_type=getattr(chart, "viz_type", None), + datasource_name=getattr(chart, "datasource_name", None), + datasource_type=getattr(chart, "datasource_type", None), + url=screenshot_url, + description=getattr(chart, "description", None), + cache_timeout=getattr(chart, "cache_timeout", None), + form_data=getattr(chart, "form_data", None), + query_context=getattr(chart, "query_context", None), + changed_by=getattr(chart, "changed_by_name", None) + or (str(chart.changed_by) if getattr(chart, "changed_by", None) else None), + changed_by_name=getattr(chart, "changed_by_name", None), + changed_on=getattr(chart, "changed_on", None), + changed_on_humanized=getattr(chart, "changed_on_humanized", None), + created_by=getattr(chart, "created_by_name", None) + or (str(chart.created_by) if getattr(chart, "created_by", None) else None), + created_on=getattr(chart, "created_on", None), + created_on_humanized=getattr(chart, "created_on_humanized", None), + uuid=str(getattr(chart, "uuid", "")) if getattr(chart, "uuid", None) else None, + tags=[ + TagInfo.model_validate(tag, from_attributes=True) + for tag in getattr(chart, "tags", []) + ] + if getattr(chart, "tags", None) + else [], + owners=[ + UserInfo.model_validate(owner, from_attributes=True) + for owner in getattr(chart, "owners", []) + ] + if getattr(chart, "owners", None) + else [], + ) + + +class GenerateChartResponse(BaseModel): + """Comprehensive chart creation response with rich metadata.""" + + # Core chart information + chart: Optional[ChartInfo] = Field(None, description="Complete chart metadata") + + # Multiple preview formats available + previews: Dict[str, ChartPreviewContent] = Field( + default_factory=dict, + description="Available preview formats keyed by format type", + ) + + # LLM-friendly capabilities + capabilities: Optional[ChartCapabilities] = Field( + None, description="Chart interaction capabilities" + ) + semantics: Optional[ChartSemantics] = Field( + None, description="Semantic chart understanding" + ) + + # Navigation and context + explore_url: Optional[str] = Field(None, description="Edit chart in Superset") + embed_code: Optional[str] = Field(None, description="HTML embed snippet") + api_endpoints: Dict[str, str] = Field( + default_factory=dict, description="Related API endpoints for data/updates" + ) + + # Performance and accessibility + performance: Optional[PerformanceMetadata] = Field( + None, description="Performance metrics" + ) + accessibility: Optional[AccessibilityMetadata] = Field( + None, description="Accessibility info" + ) + + # Success/error handling + success: bool = Field(True, description="Whether chart creation succeeded") + error: Optional[ChartError] = Field( + None, description="Error details if creation failed" + ) + warnings: List[str] = Field(default_factory=list, description="Non-fatal warnings") + + # Inherit versioning + schema_version: str = Field("2.0", description="Response schema version") + api_version: str = Field("v1", description="MCP API version") + + +class ChartFilter(ColumnOperator): + """ + Filter object for chart listing. + col: The column to filter on. Must be one of the allowed filter fields. + opr: The operator to use. Must be one of the supported operators. + value: The value to filter by (type depends on col and opr). + """ + + col: Literal[ + "slice_name", + "viz_type", + "datasource_name", + ] = Field( + ..., + description="Column to filter on. See get_chart_available_filters for " + "allowed values.", + ) + opr: ColumnOperatorEnum = Field( + ..., + description="Operator to use. See get_chart_available_filters for " + "allowed values.", + ) + value: str | int | float | bool | List[str | int | float | bool] = Field( + ..., description="Value to filter by (type depends on col and opr)" + ) + + +class ChartList(BaseModel): + charts: List[ChartInfo] + count: int + total_count: int + page: int + page_size: int + total_pages: int + has_previous: bool + has_next: bool + columns_requested: Optional[List[str]] = None + columns_loaded: Optional[List[str]] = None + filters_applied: List[ChartFilter] = Field( + default_factory=list, + description="List of advanced filter dicts applied to the query.", + ) + pagination: Optional[PaginationInfo] = None + timestamp: Optional[datetime] = None + model_config = ConfigDict(ser_json_timedelta="iso8601") + + +# --- Simplified schemas for generate_chart tool --- + + +# Common pieces +class ColumnRef(BaseModel): + name: str = Field( + ..., + description="Column name", + min_length=1, + max_length=255, + pattern=r"^[a-zA-Z0-9_][a-zA-Z0-9_\s\-\.]*$", + ) + label: Optional[str] = Field( + None, description="Display label for the column", max_length=500 + ) + dtype: Optional[str] = Field(None, description="Data type hint") + aggregate: Optional[ + Literal[ + "SUM", + "COUNT", + "AVG", + "MIN", + "MAX", + "COUNT_DISTINCT", + "STDDEV", + "VAR", + "MEDIAN", + "PERCENTILE", + ] + ] = Field( + None, + description="SQL aggregation function. Only these validated functions are " + "supported to prevent SQL errors.", + ) + + @field_validator("name") + @classmethod + def sanitize_name(cls, v: str) -> str: + """Sanitize column name to prevent XSS and SQL injection.""" + if not v or not v.strip(): + raise ValueError("Column name cannot be empty") + + # Remove HTML tags and decode entities + sanitized = html.escape(v.strip()) + + # Check for script content + if re.search(r"<script[^>]*>.*?</script>", v, re.IGNORECASE | re.DOTALL): + raise ValueError( + "Column name contains potentially malicious script content" + ) + + # Basic SQL injection patterns (basic protection) + dangerous_patterns = [ + r"(;|\||&|\$|`)", + r"\b(DROP|DELETE|INSERT|UPDATE|CREATE|ALTER|EXEC|EXECUTE)\b", + r"--", + r"/\*.*\*/", + ] + + for pattern in dangerous_patterns: + if re.search(pattern, v, re.IGNORECASE): + raise ValueError( + "Column name contains potentially unsafe characters or SQL keywords" + ) + + return sanitized + + @field_validator("label") + @classmethod + def sanitize_label(cls, v: Optional[str]) -> Optional[str]: + """Sanitize display label to prevent XSS attacks.""" + if v is None: + return v + + # Strip whitespace + v = v.strip() + if not v: + return None + + # Check for dangerous HTML tags and JavaScript protocols BEFORE escaping + dangerous_patterns = [ + r"<script[^>]*>.*?</script>", # Script tags + r"<iframe[^>]*>.*?</iframe>", # Iframe tags + r"<object[^>]*>.*?</object>", # Object tags + r"<embed[^>]*>.*?</embed>", # Embed tags + r"<link[^>]*>", # Link tags + r"<meta[^>]*>", # Meta tags + r"javascript:", # JavaScript protocol + r"vbscript:", # VBScript protocol + r"data:text/html", # Data URL HTML + r"on\w+\s*=", # Event handlers (onclick, onload, etc) + ] + + for pattern in dangerous_patterns: + if re.search(pattern, v, re.IGNORECASE | re.DOTALL): + raise ValueError( + "Label contains potentially malicious content. " + "HTML tags, JavaScript, and event handlers are not allowed in " + "labels." + ) + + # Filter dangerous Unicode characters + v = re.sub( + r"[\u200B-\u200D\uFEFF\u0000-\u0008\u000B\u000C\u000E-\u001F]", "", v + ) + + # HTML escape the cleaned content + sanitized = html.escape(v) + + return sanitized if sanitized else None + + +class AxisConfig(BaseModel): + title: Optional[str] = Field(None, description="Axis title", max_length=200) + scale: Optional[Literal["linear", "log"]] = Field( + "linear", description="Axis scale type" + ) + format: Optional[str] = Field( + None, description="Format string (e.g. '$,.2f')", max_length=50 + ) + + +class LegendConfig(BaseModel): + show: bool = Field(True, description="Whether to show legend") + position: Optional[Literal["top", "bottom", "left", "right"]] = Field( + "right", description="Legend position" + ) + + +class FilterConfig(BaseModel): + column: str = Field( + ..., description="Column to filter on", min_length=1, max_length=255 + ) + op: Literal["=", ">", "<", ">=", "<=", "!="] = Field( + ..., description="Filter operator" + ) + value: str | int | float | bool = Field(..., description="Filter value") + + @field_validator("column") + @classmethod + def sanitize_column(cls, v: str) -> str: + """Sanitize filter column name to prevent injection attacks.""" + if not v or not v.strip(): + raise ValueError("Filter column name cannot be empty") + + # Remove HTML tags and decode entities + sanitized = html.escape(v.strip()) + + # Check for dangerous patterns + if re.search(r"<script[^>]*>.*?</script>", v, re.IGNORECASE | re.DOTALL): Review Comment: ## Bad HTML filtering regexp This regular expression does not match script end tags like </script >. [Show more details](https://github.com/apache/superset/security/code-scanning/2045) ########## superset/mcp_service/chart/schemas.py: ########## @@ -0,0 +1,1078 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Pydantic schemas for chart-related responses +""" + +from __future__ import annotations + +import html +import re +from datetime import datetime +from typing import Annotated, Any, Dict, List, Literal, Optional, Protocol + +from pydantic import ( + BaseModel, + ConfigDict, + Field, + field_validator, + model_validator, + PositiveInt, +) + +from superset.daos.base import ColumnOperator, ColumnOperatorEnum +from superset.mcp_service.common.cache_schemas import ( + CacheStatus, + FormDataCacheControl, + MetadataCacheControl, + QueryCacheControl, +) +from superset.mcp_service.system.schemas import ( + PaginationInfo, + TagInfo, + UserInfo, +) + + +class ChartLike(Protocol): + """Protocol for chart-like objects with expected attributes.""" + + id: int + slice_name: Optional[str] + viz_type: Optional[str] + datasource_name: Optional[str] + datasource_type: Optional[str] + url: Optional[str] + description: Optional[str] + cache_timeout: Optional[int] + form_data: Optional[Dict[str, Any]] + query_context: Optional[Any] + changed_by: Optional[Any] # User object + changed_by_name: Optional[str] + changed_on: Optional[str | datetime] + changed_on_humanized: Optional[str] + created_by: Optional[Any] # User object + created_by_name: Optional[str] + created_on: Optional[str | datetime] + created_on_humanized: Optional[str] + uuid: Optional[str] + tags: Optional[List[Any]] + owners: Optional[List[Any]] + + +class ChartInfo(BaseModel): + """Full chart model with all possible attributes.""" + + id: int = Field(..., description="Chart ID") + slice_name: str = Field(..., description="Chart name") + viz_type: Optional[str] = Field(None, description="Visualization type") + datasource_name: Optional[str] = Field(None, description="Datasource name") + datasource_type: Optional[str] = Field(None, description="Datasource type") + url: Optional[str] = Field(None, description="Chart URL") + description: Optional[str] = Field(None, description="Chart description") + cache_timeout: Optional[int] = Field(None, description="Cache timeout") + form_data: Optional[Dict[str, Any]] = Field(None, description="Chart form data") + query_context: Optional[Any] = Field(None, description="Query context") + changed_by: Optional[str] = Field(None, description="Last modifier (username)") + changed_by_name: Optional[str] = Field( + None, description="Last modifier (display name)" + ) + changed_on: Optional[str | datetime] = Field( + None, description="Last modification timestamp" + ) + changed_on_humanized: Optional[str] = Field( + None, description="Humanized modification time" + ) + created_by: Optional[str] = Field(None, description="Chart creator (username)") + created_on: Optional[str | datetime] = Field(None, description="Creation timestamp") + created_on_humanized: Optional[str] = Field( + None, description="Humanized creation time" + ) + uuid: Optional[str] = Field(None, description="Chart UUID") + tags: List[TagInfo] = Field(default_factory=list, description="Chart tags") + owners: List[UserInfo] = Field(default_factory=list, description="Chart owners") + model_config = ConfigDict(from_attributes=True, ser_json_timedelta="iso8601") + + +class GetChartAvailableFiltersRequest(BaseModel): + """ + Request schema for get_chart_available_filters tool. + + Currently has no parameters but provides consistent API for future extensibility. + """ + + model_config = ConfigDict( + extra="forbid", + str_strip_whitespace=True, + ) + + +class ChartAvailableFiltersResponse(BaseModel): + column_operators: Dict[str, Any] = Field( + ..., description="Available filter operators and metadata for each column" + ) + + +class ChartError(BaseModel): + error: str = Field(..., description="Error message") + error_type: str = Field(..., description="Type of error") + timestamp: Optional[str | datetime] = Field(None, description="Error timestamp") + model_config = ConfigDict(ser_json_timedelta="iso8601") + + @classmethod + def create(cls, error: str, error_type: str) -> "ChartError": + """Create a standardized ChartError with timestamp.""" + from datetime import datetime + + return cls(error=error, error_type=error_type, timestamp=datetime.now()) + + +class ChartCapabilities(BaseModel): + """Describes what the chart can do for LLM understanding.""" + + supports_interaction: bool = Field(description="Chart supports user interaction") + supports_real_time: bool = Field(description="Chart supports live data updates") + supports_drill_down: bool = Field( + description="Chart supports drill-down navigation" + ) + supports_export: bool = Field(description="Chart can be exported to other formats") + optimal_formats: List[str] = Field(description="Recommended preview formats") + data_types: List[str] = Field( + description="Types of data shown (time_series, categorical, etc)" + ) + + +class ChartSemantics(BaseModel): + """Semantic information for LLM reasoning.""" + + primary_insight: str = Field( + description="Main insight or pattern the chart reveals" + ) + data_story: str = Field(description="Narrative description of what the data shows") + recommended_actions: List[str] = Field( + description="Suggested next steps based on data" + ) + anomalies: List[str] = Field(description="Notable outliers or unusual patterns") + statistical_summary: Dict[str, Any] = Field( + description="Key statistics (mean, median, trends)" + ) + + +class PerformanceMetadata(BaseModel): + """Performance information for LLM cost understanding.""" + + query_duration_ms: int = Field(description="Query execution time") + estimated_cost: Optional[str] = Field(None, description="Resource cost estimate") + cache_status: str = Field(description="Cache hit/miss status") + optimization_suggestions: List[str] = Field( + default_factory=list, description="Performance improvement tips" + ) + + +class AccessibilityMetadata(BaseModel): + """Accessibility information for inclusive visualization.""" + + color_blind_safe: bool = Field(description="Uses colorblind-safe palette") + alt_text: str = Field(description="Screen reader description") + high_contrast_available: bool = Field(description="High contrast version available") + + +class VersionedResponse(BaseModel): + """Base class for versioned API responses.""" + + schema_version: str = Field("2.0", description="Response schema version") + api_version: str = Field("v1", description="MCP API version") + + +class GetChartInfoRequest(BaseModel): + """Request schema for get_chart_info with support for ID or UUID.""" + + identifier: Annotated[ + int | str, + Field(description="Chart identifier - can be numeric ID or UUID string"), + ] + + +def serialize_chart_object(chart: ChartLike | None) -> ChartInfo | None: + if not chart: + return None + + # Generate MCP service screenshot URL instead of chart's native URL + from superset.mcp_service.utils.url_utils import get_chart_screenshot_url + + chart_id = getattr(chart, "id", None) + screenshot_url = None + if chart_id: + screenshot_url = get_chart_screenshot_url(chart_id) + + return ChartInfo( + id=chart_id, + slice_name=getattr(chart, "slice_name", None), + viz_type=getattr(chart, "viz_type", None), + datasource_name=getattr(chart, "datasource_name", None), + datasource_type=getattr(chart, "datasource_type", None), + url=screenshot_url, + description=getattr(chart, "description", None), + cache_timeout=getattr(chart, "cache_timeout", None), + form_data=getattr(chart, "form_data", None), + query_context=getattr(chart, "query_context", None), + changed_by=getattr(chart, "changed_by_name", None) + or (str(chart.changed_by) if getattr(chart, "changed_by", None) else None), + changed_by_name=getattr(chart, "changed_by_name", None), + changed_on=getattr(chart, "changed_on", None), + changed_on_humanized=getattr(chart, "changed_on_humanized", None), + created_by=getattr(chart, "created_by_name", None) + or (str(chart.created_by) if getattr(chart, "created_by", None) else None), + created_on=getattr(chart, "created_on", None), + created_on_humanized=getattr(chart, "created_on_humanized", None), + uuid=str(getattr(chart, "uuid", "")) if getattr(chart, "uuid", None) else None, + tags=[ + TagInfo.model_validate(tag, from_attributes=True) + for tag in getattr(chart, "tags", []) + ] + if getattr(chart, "tags", None) + else [], + owners=[ + UserInfo.model_validate(owner, from_attributes=True) + for owner in getattr(chart, "owners", []) + ] + if getattr(chart, "owners", None) + else [], + ) + + +class GenerateChartResponse(BaseModel): + """Comprehensive chart creation response with rich metadata.""" + + # Core chart information + chart: Optional[ChartInfo] = Field(None, description="Complete chart metadata") + + # Multiple preview formats available + previews: Dict[str, ChartPreviewContent] = Field( + default_factory=dict, + description="Available preview formats keyed by format type", + ) + + # LLM-friendly capabilities + capabilities: Optional[ChartCapabilities] = Field( + None, description="Chart interaction capabilities" + ) + semantics: Optional[ChartSemantics] = Field( + None, description="Semantic chart understanding" + ) + + # Navigation and context + explore_url: Optional[str] = Field(None, description="Edit chart in Superset") + embed_code: Optional[str] = Field(None, description="HTML embed snippet") + api_endpoints: Dict[str, str] = Field( + default_factory=dict, description="Related API endpoints for data/updates" + ) + + # Performance and accessibility + performance: Optional[PerformanceMetadata] = Field( + None, description="Performance metrics" + ) + accessibility: Optional[AccessibilityMetadata] = Field( + None, description="Accessibility info" + ) + + # Success/error handling + success: bool = Field(True, description="Whether chart creation succeeded") + error: Optional[ChartError] = Field( + None, description="Error details if creation failed" + ) + warnings: List[str] = Field(default_factory=list, description="Non-fatal warnings") + + # Inherit versioning + schema_version: str = Field("2.0", description="Response schema version") + api_version: str = Field("v1", description="MCP API version") + + +class ChartFilter(ColumnOperator): + """ + Filter object for chart listing. + col: The column to filter on. Must be one of the allowed filter fields. + opr: The operator to use. Must be one of the supported operators. + value: The value to filter by (type depends on col and opr). + """ + + col: Literal[ + "slice_name", + "viz_type", + "datasource_name", + ] = Field( + ..., + description="Column to filter on. See get_chart_available_filters for " + "allowed values.", + ) + opr: ColumnOperatorEnum = Field( + ..., + description="Operator to use. See get_chart_available_filters for " + "allowed values.", + ) + value: str | int | float | bool | List[str | int | float | bool] = Field( + ..., description="Value to filter by (type depends on col and opr)" + ) + + +class ChartList(BaseModel): + charts: List[ChartInfo] + count: int + total_count: int + page: int + page_size: int + total_pages: int + has_previous: bool + has_next: bool + columns_requested: Optional[List[str]] = None + columns_loaded: Optional[List[str]] = None + filters_applied: List[ChartFilter] = Field( + default_factory=list, + description="List of advanced filter dicts applied to the query.", + ) + pagination: Optional[PaginationInfo] = None + timestamp: Optional[datetime] = None + model_config = ConfigDict(ser_json_timedelta="iso8601") + + +# --- Simplified schemas for generate_chart tool --- + + +# Common pieces +class ColumnRef(BaseModel): + name: str = Field( + ..., + description="Column name", + min_length=1, + max_length=255, + pattern=r"^[a-zA-Z0-9_][a-zA-Z0-9_\s\-\.]*$", + ) + label: Optional[str] = Field( + None, description="Display label for the column", max_length=500 + ) + dtype: Optional[str] = Field(None, description="Data type hint") + aggregate: Optional[ + Literal[ + "SUM", + "COUNT", + "AVG", + "MIN", + "MAX", + "COUNT_DISTINCT", + "STDDEV", + "VAR", + "MEDIAN", + "PERCENTILE", + ] + ] = Field( + None, + description="SQL aggregation function. Only these validated functions are " + "supported to prevent SQL errors.", + ) + + @field_validator("name") + @classmethod + def sanitize_name(cls, v: str) -> str: + """Sanitize column name to prevent XSS and SQL injection.""" + if not v or not v.strip(): + raise ValueError("Column name cannot be empty") + + # Remove HTML tags and decode entities + sanitized = html.escape(v.strip()) + + # Check for script content + if re.search(r"<script[^>]*>.*?</script>", v, re.IGNORECASE | re.DOTALL): Review Comment: ## Bad HTML filtering regexp This regular expression does not match script end tags like </script >. [Show more details](https://github.com/apache/superset/security/code-scanning/2044) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
