This is an automated email from the ASF dual-hosted git repository.
djwang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/cloudberry.git
The following commit(s) were added to refs/heads/main by this push:
new 1d9a1f25492 Feature: add MCP server for Cloudberry
1d9a1f25492 is described below
commit 1d9a1f25492763f219c8c753fb938730f5271750
Author: Shengwen Yang <[email protected]>
AuthorDate: Fri Sep 19 15:42:02 2025 +0800
Feature: add MCP server for Cloudberry
Add a comprehensive Model Context Protocol (MCP) server for Apache
Cloudberry enabling seamless integration with LLM applications.
This implementation provides secure database interaction capabilities
through AI-ready interfaces, supporting both stdio and HTTP transport
modes for maximum compatibility with various LLM clients.
**What:**
- Complete MCP server implementation with async database operations
- Safe SQL query execution with parameterized queries
- Administrative tools for performance monitoring and optimization
- Predefined prompts for common database tasks
- Security-first design with SQL injection prevention
**Why:**
- Enable AI assistants to interact with Cloudberry databases safely
- Provide standardized interface for LLM applications
- Reduce manual database management overhead
- Support modern AI development workflows
**How:**
- Built with asyncpg for high-performance PostgreSQL operations
- Implements MCP protocol for universal LLM compatibility
- Uses environment-based configuration for flexible deployment
- Includes comprehensive test suite and documentation
**Compatibility:**
- Requires Python 3.8+ with asyncpg support
- Compatible with Claude Desktop, Cursor, Windsurf, VS Code, Trae, Qwen
Desktop, etc.
- Supports both stdio and HTTP transport modes
- Works with standard PostgreSQL connection strings
**Breaking Changes:**
None - this is a new addition.
---
mcp-server/README.md | 399 +++++++++++++++++
mcp-server/dotenv.example | 31 ++
mcp-server/pyproject.toml | 70 +++
mcp-server/pytest.ini | 32 ++
mcp-server/run_tests.sh | 38 ++
mcp-server/src/cbmcp/__init__.py | 32 ++
mcp-server/src/cbmcp/__main__.py | 27 ++
mcp-server/src/cbmcp/client.py | 158 +++++++
mcp-server/src/cbmcp/config.py | 73 +++
mcp-server/src/cbmcp/database.py | 773 ++++++++++++++++++++++++++++++++
mcp-server/src/cbmcp/prompt.py | 62 +++
mcp-server/src/cbmcp/security.py | 96 ++++
mcp-server/src/cbmcp/server.py | 551 +++++++++++++++++++++++
mcp-server/tests/README.md | 115 +++++
mcp-server/tests/test_cbmcp.py | 338 ++++++++++++++
mcp-server/tests/test_database_tools.py | 303 +++++++++++++
16 files changed, 3098 insertions(+)
diff --git a/mcp-server/README.md b/mcp-server/README.md
new file mode 100644
index 00000000000..76c8776e722
--- /dev/null
+++ b/mcp-server/README.md
@@ -0,0 +1,399 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+# Apache Cloudberry MCP Server
+
+A Model Communication Protocol (MCP) server for Apache Cloudberry database
interaction, providing secure and efficient database management capabilities
through AI-ready interfaces.
+
+## Features
+
+- **Database Metadata Resources**: Access schemas, tables, views, indexes, and
column information
+- **Safe Query Tools**: Execute parameterized SQL queries with security
validation
+- **Administrative Tools**: Table statistics, large table analysis, and query
optimization
+- **Context-Aware Prompts**: Predefined prompts for common database tasks
+- **Security-First Design**: SQL injection prevention, read-only constraints,
and connection pooling
+- **Async Performance**: Built with asyncpg for high-performance database
operations
+
+## Prerequisites
+
+- Python 3.8+
+- uv (for dependency management)
+
+## Installation
+
+### Install uv
+
+```bash
+curl -sSfL https://astral.sh/uv/install.sh | sh
+```
+
+### Install Dependencies
+
+```bash
+cd mcp-server
+uv venv
+source .venv/bin/activate
+uv sync
+```
+
+### Install Project
+
+```bash
+uv pip install -e .
+```
+
+### Build Project
+
+```bash
+uv build
+```
+
+## Configuration
+
+Create a `.env` file in the project root:
+
+```env
+# Database Configuration
+DB_HOST=localhost
+DB_PORT=5432
+DB_NAME=postgres
+DB_USER=postgres
+DB_PASSWORD=your_password
+
+# Server Configuration
+MCP_HOST=localhost
+MCP_PORT=8000
+MCP_DEBUG=false
+```
+
+## Usage
+
+### Running the Server
+
+```bash
+# Run the MCP server
+python -m cbmcp.server
+
+# Or run with cloudberry-mcp-server
+cloudberry-mcp-server
+
+# Or run with custom configuration
+MCP_HOST=0.0.0.0 MCP_PORT=8080 python -m cbmcp.server
+```
+
+### Testing the Client
+
+```bash
+# Run the test client
+python -m cbmcp.client
+```
+
+## API Reference
+
+### Resources
+
+- `postgres://schemas` - List all database schemas
+- `postgres://database/info` - Get general database info
+- `postgres://database/summary` - Get detailed database summary
+
+### Tools
+
+#### Query Tools
+- `execute_query(query, params, readonly)` - Execute a SQL query
+- `explain_query(query, params)` - Get query execution plan
+- `get_table_stats(schema, table)` - Get table statistics
+- `list_large_tables(limit)` - List largest tables
+
+#### User & Permission Management
+- `list_users()` - List all database users
+- `list_user_permissions(username)` - List permissions for a specific user
+- `list_table_privileges(schema, table)` - List privileges for a specific table
+
+#### Schema & Structure
+- `list_constraints(schema, table)` - List constraints for a table
+- `list_foreign_keys(schema, table)` - List foreign keys for a table
+- `list_referenced_tables(schema, table)` - List tables that reference this
table
+- `get_table_ddl(schema, table)` - Get DDL statement for a table
+
+#### Performance & Monitoring
+- `get_slow_queries(limit)` - List slow queries
+- `get_index_usage()` - Analyze index usage statistics
+- `get_table_bloat_info()` - Analyze table bloat information
+- `get_database_activity()` - Show current database activity
+- `get_vacuum_info()` - Get vacuum and analyze statistics
+
+#### Database Objects
+- `list_functions(schema)` - List functions in a schema
+- `get_function_definition(schema, function)` - Get function definition
+- `list_triggers(schema, table)` - List triggers for a table
+- `list_materialized_views(schema)` - List materialized views in a schema
+- `list_active_connections()` - List active database connections
+
+### Prompts
+
+- `analyze_query_performance` - Query optimization assistance
+- `suggest_indexes` - Index recommendation guidance
+- `database_health_check` - Database health assessment
+
+## Security Features
+
+- **SQL Injection Prevention**: Comprehensive query validation
+- **Read-Only Constraints**: Configurable write protection
+- **Parameterized Queries**: Safe parameter handling
+- **Connection Pooling**: Secure connection management
+- **Sensitive Table Protection**: Blocks access to system tables
+
+
+## Quick Start with Cloudberry Demo Cluster
+
+This section shows how to quickly set up and test the Cloudberry MCP Server
using a local Cloudberry demo cluster. This is ideal for development and
testing purposes.
+
+Assume you already have a running [Cloudberry demo
cluster](https://cloudberry.apache.org/docs/deployment/set-demo-cluster) and
install & build MCP server as described above.
+
+1. Configure local connections in `pg_hba.conf`
+
+**Note**: This configuration is for demo purposes only. Do not use `trust`
authentication in production environments.
+
+```bash
+[gpadmin@cdw]$ vi
~/cloudberry/gpAux/gpdemo/datadirs/qddir/demoDataDir-1/pg_hba.conf
+```
+
+Add the following lines to the end of the pg_hba.conf:
+
+```
+# IPv4 local connections
+host all all 127.0.0.1/32 trust
+# IPv6 local connections
+host all all ::1/128 trust
+```
+
+After modifying `pg_hba.conf`, reload the configuration parameters:
+```bash
+[gpadmin@cdw]$ gpstop -u
+```
+
+2. Create environment configuration
+
+Create a `.env` in the project root directory:
+
+```
+# Database Configuration (Demo cluster defaults)
+DB_HOST=localhost
+DB_PORT=7000
+DB_NAME=postgres
+DB_USER=gpadmin
+# No password required for demo cluster
+
+# Server Configuration
+MCP_HOST=localhost
+MCP_PORT=8000
+MCP_DEBUG=false
+```
+
+3. Start the MCP server
+
+```bash
+MCP_HOST=0.0.0.0 MCP_PORT=8000 python -m cbmcp.server
+```
+
+You should see output indicating the server is running:
+```
+[09/17/25 14:07:50] INFO Starting MCP server 'Apache Cloudberry MCP
Server' with transport server.py:1572
+ 'streamable-http' on http://0.0.0.0:8000/mcp/
+```
+
+4. Configure your MCP client.
+
+Add the following server configuration to your MCP client:
+
+- Server Type: Streamable-HTTP
+- URL: http://[YOUR_HOST_IP]:8000/mcp
+
+Replace `[YOUR_HOST_IP]` with your actual host IP address.
+
+
+## LLM Client Integration
+
+### Claude Desktop Configuration
+
+Add the following configuration to your Claude Desktop configuration file:
+
+#### Stdio Transport (Recommended)
+
+```json
+{
+ "mcpServers": {
+ "cloudberry-mcp-server": {
+ "command": "uvx",
+ "args": [
+ "--with",
+ "PATH/TO/cbmcp-0.1.0-py3-none-any.whl",
+ "python",
+ "-m",
+ "cbmcp.server",
+ "--mode",
+ "stdio"
+ ],
+ "env": {
+ "DB_HOST": "localhost",
+ "DB_PORT": "5432",
+ "DB_NAME": "dvdrental",
+ "DB_USER": "yangshengwen",
+ "DB_PASSWORD": ""
+ }
+ }
+ }
+}
+```
+
+#### HTTP Transport
+
+```json
+{
+ "mcpServers": {
+ "cloudberry-mcp-server": {
+ "type": "streamable-http",
+ "url": "https://localhost:8000/mcp/",
+ "headers": {
+ "Authorization": ""
+ }
+ }
+ }
+}
+```
+
+### Cursor Configuration
+
+For Cursor IDE, add the configuration to your `.cursor/mcp.json` file:
+
+```json
+{
+ "mcpServers": {
+ "cloudberry-mcp": {
+ "command": "uvx",
+ "args": ["--with", "cbmcp", "python", "-m", "cbmcp.server", "--mode",
"stdio"],
+ "env": {
+ "DB_HOST": "localhost",
+ "DB_PORT": "5432",
+ "DB_NAME": "dvdrental",
+ "DB_USER": "postgres",
+ "DB_PASSWORD": "your_password"
+ }
+ }
+ }
+}
+```
+
+### Windsurf Configuration
+
+For Windsurf IDE, configure in your settings:
+
+```json
+{
+ "mcp": {
+ "servers": {
+ "cloudberry-mcp": {
+ "type": "stdio",
+ "command": "uvx",
+ "args": ["--with", "cbmcp", "python", "-m", "cbmcp.server", "--mode",
"stdio"],
+ "env": {
+ "DB_HOST": "localhost",
+ "DB_PORT": "5432",
+ "DB_NAME": "dvdrental",
+ "DB_USER": "postgres",
+ "DB_PASSWORD": "your_password"
+ }
+ }
+ }
+ }
+}
+```
+
+### VS Code with Cline
+
+For VS Code with the Cline extension, add to your settings:
+
+```json
+{
+ "cline.mcpServers": {
+ "cloudberry-mcp": {
+ "command": "uvx",
+ "args": ["--with", "cbmcp", "python", "-m", "cbmcp.server", "--mode",
"stdio"],
+ "env": {
+ "DB_HOST": "localhost",
+ "DB_PORT": "5432",
+ "DB_NAME": "dvdrental",
+ "DB_USER": "postgres",
+ "DB_PASSWORD": "your_password"
+ }
+ }
+ }
+}
+```
+
+### Installation via pip
+
+If you prefer to install the package globally instead of using uvx:
+
+```bash
+# Install the package
+pip install cbmcp-0.1.0-py3-none-any.whl
+
+# Or using pip install from source
+pip install -e .
+
+# Then use in configuration
+{
+ "command": "python",
+ "args": ["-m", "cbmcp.server", "--mode", "stdio"]
+}
+```
+
+### Environment Variables
+
+All configurations support the following environment variables:
+
+- `DB_HOST`: Database host (default: localhost)
+- `DB_PORT`: Database port (default: 5432)
+- `DB_NAME`: Database name (default: postgres)
+- `DB_USER`: Database username
+- `DB_PASSWORD`: Database password
+- `MCP_HOST`: Server host for HTTP mode (default: localhost)
+- `MCP_PORT`: Server port for HTTP mode (default: 8000)
+- `MCP_DEBUG`: Enable debug logging (default: false)
+
+### Troubleshooting
+
+#### Common Issues
+
+1. **Connection refused**: Ensure Apache Cloudberry is running and accessible
+2. **Authentication failed**: Check database credentials in environment
variables
+3. **Module not found**: Ensure the package is installed correctly
+4. **Permission denied**: Check file permissions for the package
+
+#### Debug Mode
+
+Enable debug logging by setting:
+```bash
+export MCP_DEBUG=true
+```
+
+## License
+
+Apache License 2.0
\ No newline at end of file
diff --git a/mcp-server/dotenv.example b/mcp-server/dotenv.example
new file mode 100644
index 00000000000..896cb33a539
--- /dev/null
+++ b/mcp-server/dotenv.example
@@ -0,0 +1,31 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Example environment configuration
+# Copy this file to .env and update with your actual values
+
+# Database Configuration
+DB_HOST=localhost
+DB_PORT=5432
+DB_NAME=postgres
+DB_USER=postgres
+DB_PASSWORD=your_password_here
+
+# Server Configuration
+MCP_HOST=localhost
+MCP_PORT=8000
+MCP_DEBUG=false
diff --git a/mcp-server/pyproject.toml b/mcp-server/pyproject.toml
new file mode 100644
index 00000000000..984cb5e12a1
--- /dev/null
+++ b/mcp-server/pyproject.toml
@@ -0,0 +1,70 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[project]
+name = "cloudberry-mcp-server"
+version = "0.1.0"
+description = "MCP server for Apache Cloudberry database interaction"
+readme = "README.md"
+requires-python = ">=3.10"
+authors = [
+ {name = "Shengwen Yang", email = "[email protected]"},
+]
+maintainers = [
+ {name = "Shengwen Yang", email = "[email protected]"},
+]
+license = {text = "Apache License 2.0"}
+keywords = ["mcp", "cloudberry", "postgresql", "database", "server", "ai"]
+classifiers = [
+ "Development Status :: 3 - Alpha",
+ "Intended Audience :: Developers",
+ "License :: OSI Approved :: Apache Software License, Version 2.0",
+ "Programming Language :: Python :: 3",
+ "Programming Language :: Python :: 3.8",
+ "Programming Language :: Python :: 3.9",
+ "Programming Language :: Python :: 3.10",
+ "Programming Language :: Python :: 3.11",
+ "Programming Language :: Python :: 3.12",
+ "Programming Language :: Python :: 3.13",
+ "Topic :: Database",
+ "Topic :: Software Development :: Libraries :: Python Modules",
+ "Topic :: Database :: Front-Ends",
+]
+dependencies = [
+ "fastmcp>=2.10.6",
+ "psycopg2-binary==2.9.10",
+ "asyncpg>=0.29.0",
+ "pydantic>=2.0.0",
+ "python-dotenv>=1.0.0",
+ "aiohttp>=3.12.15",
+ "starlette>=0.27.0",
+]
+
+[project.optional-dependencies]
+dev = [
+ "pytest>=7.0.0",
+ "pytest-asyncio>=0.21.0",
+ "pytest-cov>=4.0.0",
+]
+
+[project.urls]
+Homepage = "https://github.com/apache/cloudberry//tree/main/mcp-server"
+Repository = "https://github.com/apache/cloudberry"
+Documentation =
"https://github.com/apache/cloudberry/mcp-server/tree/main/mcp-server/README.md"
+
+[project.scripts]
+cloudberry-mcp-server = "cbmcp.server:main"
diff --git a/mcp-server/pytest.ini b/mcp-server/pytest.ini
new file mode 100644
index 00000000000..519115d6a19
--- /dev/null
+++ b/mcp-server/pytest.ini
@@ -0,0 +1,32 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[tool:pytest]
+testpaths = tests
+python_files = test_*.py
+python_classes = Test*
+python_functions = test_*
+addopts =
+ -v
+ --tb=short
+ --strict-markers
+ --disable-warnings
+asyncio_mode = auto
+markers =
+ slow: marks tests as slow (deselect with '-m "not slow"')
+ integration: marks tests as integration tests
+ unit: marks tests as unit tests
\ No newline at end of file
diff --git a/mcp-server/run_tests.sh b/mcp-server/run_tests.sh
new file mode 100755
index 00000000000..a47184cab9f
--- /dev/null
+++ b/mcp-server/run_tests.sh
@@ -0,0 +1,38 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+# Test script for Apache Cloudberry MCP Server
+
+echo "=== Install test dependencies ==="
+uv pip install -e ".[dev]"
+
+echo "=== Run all tests ==="
+uv run pytest tests/ -v
+
+echo "=== Run specific test patterns ==="
+echo "Run stdio mode test:"
+uv run pytest
tests/test_cbmcp.py::TestCloudberryMCPClient::test_list_capabilities -v
+
+echo "Run http mode test:"
+uv run pytest
tests/test_cbmcp.py::TestCloudberryMCPClient::test_list_capabilities -v
+
+echo "=== Run coverage tests ==="
+uv run pytest tests/ --cov=cbmcp --cov-report=html --cov-report=term
+
+echo "=== Test completed ==="
diff --git a/mcp-server/src/cbmcp/__init__.py b/mcp-server/src/cbmcp/__init__.py
new file mode 100644
index 00000000000..a584c415ee6
--- /dev/null
+++ b/mcp-server/src/cbmcp/__init__.py
@@ -0,0 +1,32 @@
+#!/usr/bin/env python
+ # -*- coding: utf-8 -*-
+ # Licensed to the Apache Software Foundation (ASF) under one
+ # or more contributor license agreements. See the NOTICE file
+ # distributed with this work for additional information
+ # regarding copyright ownership. The ASF licenses this file
+ # to you under the Apache License, Version 2.0 (the
+ # "License"); you may not use this file except in compliance
+ # with the License. You may obtain a copy of the License at
+ #
+ # http://www.apache.org/licenses/LICENSE-2.0
+ #
+
+"""
+Apache Cloudberry MCP Server Package
+"""
+
+from .server import CloudberryMCPServer
+from .client import CloudberryMCPClient
+from .config import DatabaseConfig, ServerConfig
+from .database import DatabaseManager
+from .security import SQLValidator
+
+__version__ = "0.1.0"
+__all__ = [
+ "CloudberryMCPServer",
+ "CloudberryMCPClient",
+ "DatabaseConfig",
+ "ServerConfig",
+ "DatabaseManager",
+ "SQLValidator",
+]
\ No newline at end of file
diff --git a/mcp-server/src/cbmcp/__main__.py b/mcp-server/src/cbmcp/__main__.py
new file mode 100644
index 00000000000..f467fb2ac53
--- /dev/null
+++ b/mcp-server/src/cbmcp/__main__.py
@@ -0,0 +1,27 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Main entry point for the cbmcp package
+"""
+
+from .server import main
+
+if __name__ == "__main__":
+ main()
\ No newline at end of file
diff --git a/mcp-server/src/cbmcp/client.py b/mcp-server/src/cbmcp/client.py
new file mode 100644
index 00000000000..b88f9993bd1
--- /dev/null
+++ b/mcp-server/src/cbmcp/client.py
@@ -0,0 +1,158 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+MCP Client for testing the Apache Cloudberry MCP Server
+
+A client using the fastmcp SDK to interact with the Apache Cloudberry MCP
server implementation.
+"""
+
+from typing import Any, Dict, Optional
+from fastmcp import Client
+
+from .config import DatabaseConfig, ServerConfig
+from .server import CloudberryMCPServer
+
+class CloudberryMCPClient:
+ """MCP client for testing the Apache Cloudberry server using fastmcp SDK
+
+ Usage:
+ # Method 1: Using async context manager
+ async with CloudberryMCPClient() as client:
+ tools = await client.list_tools()
+ resources = await client.list_resources()
+
+ # Method 2: Using create class method
+ client = await CloudberryMCPClient.create()
+ tools = await client.list_tools()
+ await client.close()
+
+ # Method 3: Manual initialization
+ client = CloudberryMCPClient()
+ await client.initialize()
+ tools = await client.list_tools()
+ await client.close()
+ """
+
+ def __init__(self, mode: str = "stdio", server_url: str =
"http://localhost:8000/mcp/"):
+ self.mode = mode
+ self.server_url = server_url
+ self.client: Optional[Client] = None
+
+ @classmethod
+ async def create(cls, mode: str = "stdio", server_url: str =
"http://localhost:8000/mcp/") -> "CloudberryMCPClient":
+ """Asynchronously create and initialize the client"""
+ instance = cls(mode, server_url)
+ await instance.initialize()
+ return instance
+
+ async def initialize(self):
+ """Initialize the client connection"""
+ if self.mode == "stdio":
+ server_config = ServerConfig.from_env()
+ db_config = DatabaseConfig.from_env()
+ server = CloudberryMCPServer(server_config, db_config)
+ self.client = Client(server.mcp)
+ else:
+ self.client = Client(self.server_url)
+
+ await self.client.__aenter__()
+
+ async def close(self):
+ """Close the client connection"""
+ if self.client:
+ await self.client.__aexit__(None, None, None)
+ self.client = None
+
+ async def __aenter__(self):
+ if self.mode == "stdio":
+ server_config = ServerConfig.from_env()
+ db_config = DatabaseConfig.from_env()
+ server = CloudberryMCPServer(server_config, db_config)
+ self.client = Client(server.mcp)
+ else:
+ self.client = Client(self.server_url)
+
+ await self.client.__aenter__()
+ return self
+
+ async def __aexit__(self, exc_type, exc_val, exc_tb):
+ if self.client:
+ await self.client.__aexit__(exc_type, exc_val, exc_tb)
+
+ async def call_tool(self, tool_name: str, arguments: Dict[str, Any]) ->
Any:
+ """Call a tool on the MCP server"""
+ if not self.client:
+ raise RuntimeError("Client not initialized. Use async with
statement.")
+
+ return await self.client.call_tool(tool_name, arguments)
+
+ async def get_resource(self, resource_uri: str):
+ """Get a resource from the MCP server"""
+ if not self.client:
+ raise RuntimeError("Client not initialized. Use async with
statement.")
+
+ return await self.client.read_resource(resource_uri)
+
+ async def get_prompt(self, prompt_name: str, params: Dict[str, Any]=None):
+ """Get a prompt from the MCP server"""
+ if not self.client:
+ raise RuntimeError("Client not initialized. Use async with
statement.")
+
+ return await self.client.get_prompt(prompt_name, params)
+
+ async def list_tools(self) -> list:
+ """List available tools on the server"""
+ if not self.client:
+ raise RuntimeError("Client not initialized. Use async with
statement.")
+
+ return await self.client.list_tools()
+
+ async def list_resources(self) -> list:
+ """List available resources on the server"""
+ if not self.client:
+ raise RuntimeError("Client not initialized. Use async with
statement.")
+
+ return await self.client.list_resources()
+
+ async def list_prompts(self) -> list:
+ """List available prompts on the server"""
+ if not self.client:
+ raise RuntimeError("Client not initialized. Use async with
statement.")
+
+ return await self.client.list_prompts()
+
+
+if __name__ == "__main__":
+ import asyncio
+
+ async def main():
+ async with CloudberryMCPClient(mode="http") as client:
+ results = await client.call_tool("execute_query", {
+ "query": "SELECT * FROM film LIMIT 5"
+ })
+ print("Results:", results)
+
+ results = await client.call_tool("list_columns", {
+ "table": "film",
+ "schema": "public"
+ })
+ print("Columns:", results)
+
+ asyncio.run(main())
\ No newline at end of file
diff --git a/mcp-server/src/cbmcp/config.py b/mcp-server/src/cbmcp/config.py
new file mode 100644
index 00000000000..a810e67ed8f
--- /dev/null
+++ b/mcp-server/src/cbmcp/config.py
@@ -0,0 +1,73 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Configuration utilities for the Apache Cloudberry MCP server
+"""
+
+import os
+from dataclasses import dataclass
+from dotenv import load_dotenv
+
+
+@dataclass
+class DatabaseConfig:
+ """Database connection configuration"""
+ host: str
+ port: int
+ database: str
+ username: str
+ password: str
+
+ @classmethod
+ def from_env(cls) -> "DatabaseConfig":
+ """Create config from environment variables"""
+ load_dotenv()
+ return cls(
+ host=os.getenv("DB_HOST", "localhost"),
+ port=int(os.getenv("DB_PORT", "5432")),
+ database=os.getenv("DB_NAME", "postgres"),
+ username=os.getenv("DB_USER", "postgres"),
+ password=os.getenv("DB_PASSWORD", ""),
+ )
+
+ @property
+ def connection_string(self) -> str:
+ """Get Apache Cloudberry connection string"""
+ return
f"postgresql://{self.username}:{self.password}@{self.host}:{self.port}/{self.database}"
+
+
+@dataclass
+class ServerConfig:
+ """MCP server configuration"""
+ host: str
+ port: int
+ path: str
+ debug: bool
+
+ @classmethod
+ def from_env(cls) -> "ServerConfig":
+ """Create config from environment variables"""
+ load_dotenv()
+ return cls(
+ host=os.getenv("MCP_HOST", "localhost"),
+ port=int(os.getenv("MCP_PORT", "8000")),
+ path=os.getenv("MCP_PATH", "/mcp/"),
+ debug=os.getenv("MCP_DEBUG", "false").lower() == "true",
+ )
\ No newline at end of file
diff --git a/mcp-server/src/cbmcp/database.py b/mcp-server/src/cbmcp/database.py
new file mode 100644
index 00000000000..77cdd2b308b
--- /dev/null
+++ b/mcp-server/src/cbmcp/database.py
@@ -0,0 +1,773 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Database utilities for the Apache Cloudberry MCP server
+"""
+
+import logging
+from typing import Any, Dict, Optional
+from contextlib import asynccontextmanager
+import asyncpg
+
+from .config import DatabaseConfig
+from .security import SQLValidator
+
+
+logger = logging.getLogger(__name__)
+
+
+class DatabaseManager:
+ """Manages database connections and operations"""
+
+ def __init__(self, config: DatabaseConfig):
+ self.config = config
+ self._connection_pool: Optional[asyncpg.Pool] = None
+
+ @asynccontextmanager
+ async def get_connection(self):
+ """Get a database connection from the pool"""
+ if not self._connection_pool:
+ self._connection_pool = await asyncpg.create_pool(
+ host=self.config.host,
+ port=self.config.port,
+ database=self.config.database,
+ user=self.config.username,
+ password=self.config.password,
+ min_size=1,
+ max_size=10,
+ command_timeout=60.0,
+ )
+
+ try:
+ async with self._connection_pool.acquire() as conn:
+ yield conn
+ except Exception as e:
+ logger.error(f"Database connection error: {e}")
+ raise
+
+ async def execute_query(
+ self,
+ query: str,
+ params: Optional[Dict[str, Any]] = None,
+ readonly: bool = True
+ ) -> Dict[str, Any]:
+ """Execute a SQL query with safety validation"""
+ # Validate query for security
+ is_valid, error_msg = SQLValidator.validate_query(query)
+ if not is_valid:
+ return {"error": f"Query validation failed: {error_msg}"}
+
+ # Check readonly constraint
+ if readonly and not SQLValidator.is_readonly_query(query):
+ return {"error": "Only read-only queries are allowed"}
+
+ try:
+ async with self.get_connection() as conn:
+ if params:
+ # Sanitize parameter names
+ sanitized_params = {
+ SQLValidator.sanitize_parameter_name(k): v
+ for k, v in params.items()
+ }
+ result = await conn.fetch(query, **sanitized_params)
+ else:
+ result = await conn.fetch(query)
+
+ if not result:
+ return {"columns": [], "rows": [], "row_count": 0}
+
+ columns = list(result[0].keys())
+ rows = [list(row.values()) for row in result]
+
+ return {
+ "columns": columns,
+ "rows": rows,
+ "row_count": len(rows)
+ }
+
+ except Exception as e:
+ logger.error(f"Query execution error: {e}")
+ return {"error": f"Error executing query: {str(e)}"}
+
+
+ async def get_table_info(self, schema: str, table: str) -> Dict[str, Any]:
+ """Get detailed information about a table"""
+ try:
+ async with self.get_connection() as conn:
+ # Get column information
+ columns = await conn.fetch(
+ "SELECT column_name, data_type, is_nullable,
column_default "
+ "FROM information_schema.columns "
+ "WHERE table_schema = $1 AND table_name = $2 "
+ "ORDER BY ordinal_position",
+ schema, table
+ )
+
+ # Get index information
+ indexes = await conn.fetch(
+ "SELECT indexname, indexdef FROM pg_indexes "
+ "WHERE schemaname = $1 AND tablename = $2 "
+ "ORDER BY indexname",
+ schema, table
+ )
+
+ # Get table statistics
+ stats = await conn.fetchrow(
+ "SELECT "
+ "pg_size_pretty(pg_total_relation_size($1)) as total_size,
"
+ "pg_size_pretty(pg_relation_size($1)) as table_size, "
+ "pg_size_pretty(pg_total_relation_size($1) -
pg_relation_size($1)) as indexes_size, "
+ "(SELECT COUNT(*) FROM $1) as row_count",
+ f"{schema}.{table}"
+ )
+
+ return {
+ "columns": [dict(col) for col in columns],
+ "indexes": [dict(idx) for idx in indexes],
+ "statistics": dict(stats) if stats else {}
+ }
+
+ except Exception as e:
+ logger.error(f"Error getting table info: {e}")
+ return {"error": str(e)}
+
+ async def close(self):
+ """Close the connection pool"""
+ if self._connection_pool:
+ await self._connection_pool.close()
+ self._connection_pool = None
+
+ async def list_schemas(self) -> list[str]:
+ """List all database schemas"""
+ async with self.get_connection() as conn:
+ records = await conn.fetch(
+ "SELECT schema_name FROM information_schema.schemata "
+ "WHERE schema_name NOT LIKE 'pg_%' AND schema_name !=
'information_schema' "
+ "ORDER BY schema_name"
+ )
+ return [r["schema_name"] for r in records]
+
+ async def get_database_info(self) -> dict[str, str]:
+ """Get general database information"""
+ async with self.get_connection() as conn:
+ version = await conn.fetchval("SELECT version()")
+ size = await conn.fetchval("SELECT
pg_size_pretty(pg_database_size(current_database()))")
+ stats = await conn.fetchrow(
+ "SELECT COUNT(*) as total_tables FROM
information_schema.tables "
+ "WHERE table_type = 'BASE TABLE' AND table_schema NOT LIKE
'pg_%'"
+ )
+
+ return {
+ "Version": version,
+ "Size": size,
+ "Total Tables": str(stats['total_tables'])
+ }
+
+ async def get_database_summary(self) -> dict[str, dict]:
+ """Get comprehensive database summary"""
+ summary = {}
+
+ async with self.get_connection() as conn:
+ # Get schemas
+ schemas = await conn.fetch(
+ "SELECT schema_name FROM information_schema.schemata "
+ "WHERE schema_name NOT LIKE 'pg_%' AND schema_name !=
'information_schema' "
+ "ORDER BY schema_name"
+ )
+
+ for schema_row in schemas:
+ schema = schema_row["schema_name"]
+ summary[schema] = {}
+
+ # Get tables
+ tables = await conn.fetch(
+ "SELECT table_name FROM information_schema.tables "
+ "WHERE table_schema = $1 AND table_type = 'BASE TABLE' "
+ "ORDER BY table_name",
+ schema
+ )
+ summary[schema]["tables"] = [t["table_name"] for t in tables]
+
+ # Get views
+ views = await conn.fetch(
+ "SELECT table_name FROM information_schema.views "
+ "WHERE table_schema = $1 "
+ "ORDER BY table_name",
+ schema
+ )
+ summary[schema]["views"] = [v["table_name"] for v in views]
+
+ return summary
+
+ async def list_tables(self, schema: str) -> list[str]:
+ """List tables in a specific schema"""
+ async with self.get_connection() as conn:
+ records = await conn.fetch(
+ "SELECT table_name FROM information_schema.tables "
+ "WHERE table_schema = $1 AND table_type = 'BASE TABLE' "
+ "ORDER BY table_name",
+ schema
+ )
+ return [r["table_name"] for r in records]
+
+ async def list_views(self, schema: str) -> list[str]:
+ """List views in a specific schema"""
+ async with self.get_connection() as conn:
+ records = await conn.fetch(
+ "SELECT table_name FROM information_schema.views "
+ "WHERE table_schema = $1 "
+ "ORDER BY table_name",
+ schema
+ )
+ return [r["table_name"] for r in records]
+
+ async def list_indexes(self, schema: str, table: str) -> list[dict]:
+ """List indexes for a specific table"""
+ async with self.get_connection() as conn:
+ records = await conn.fetch(
+ "SELECT indexname, indexdef FROM pg_indexes "
+ "WHERE schemaname = $1 AND tablename = $2 "
+ "ORDER BY indexname",
+ schema, table
+ )
+ return [{"indexname": r["indexname"], "indexdef": r["indexdef"]}
for r in records]
+
+ async def list_columns(self, schema: str, table: str) -> list[dict]:
+ """List columns for a specific table"""
+ async with self.get_connection() as conn:
+ records = await conn.fetch(
+ "SELECT column_name, data_type, is_nullable, column_default "
+ "FROM information_schema.columns "
+ "WHERE table_schema = $1 AND table_name = $2 "
+ "ORDER BY ordinal_position",
+ schema, table
+ )
+ return [
+ {
+ "column_name": r["column_name"],
+ "data_type": r["data_type"],
+ "is_nullable": r["is_nullable"],
+ "column_default": r["column_default"]
+ }
+ for r in records
+ ]
+
+ async def get_table_stats(self, schema: str, table: str) -> dict[str, Any]:
+ """Get statistics for a table"""
+ try:
+ # Validate schema and table names to prevent SQL injection
+ if not schema.replace('_', '').replace('-', '').isalnum():
+ return {"error": "Invalid schema name"}
+ if not table.replace('_', '').replace('-', '').isalnum():
+ return {"error": "Invalid table name"}
+
+ async with self.get_connection() as conn:
+ # Use format() with proper identifier quoting
+ qualified_name = f"{schema}.{table}"
+ sql = (
+ f"SELECT "
+
f"pg_size_pretty(pg_total_relation_size('{qualified_name}')) as total_size, "
+ f"pg_size_pretty(pg_relation_size('{qualified_name}')) as
table_size, "
+
f"pg_size_pretty(pg_total_relation_size('{qualified_name}') -
pg_relation_size('{qualified_name}')) as indexes_size, "
+ f"(SELECT COUNT(*) FROM {qualified_name}) as row_count"
+ )
+ result = await conn.fetchrow(sql)
+
+ if not result:
+ return {"error": f"Table {schema}.{table} not found"}
+
+ return {
+ "total_size": result["total_size"],
+ "table_size": result["table_size"],
+ "indexes_size": result["indexes_size"],
+ "row_count": result["row_count"]
+ }
+ except Exception as e:
+ return {"error": f"Error getting table stats: {str(e)}"}
+
+ async def list_large_tables(self, limit: int = 10) -> list[dict]:
+ """List the largest tables in the database"""
+ async with self.get_connection() as conn:
+ result = await conn.fetch(
+ "SELECT "
+ "schemaname, tablename, "
+
"pg_size_pretty(pg_total_relation_size(schemaname||'.'||tablename)) as size, "
+ "pg_total_relation_size(schemaname||'.'||tablename) as
size_bytes "
+ "FROM pg_tables "
+ "WHERE schemaname NOT LIKE 'pg_%' "
+ "ORDER BY pg_total_relation_size(schemaname||'.'||tablename)
DESC "
+ "LIMIT $1",
+ limit
+ )
+
+ return [
+ {
+ "schema": row["schemaname"],
+ "table": row["tablename"],
+ "size": row["size"],
+ "size_bytes": row["size_bytes"]
+ }
+ for row in result
+ ]
+
+ async def list_users(self) -> list[str]:
+ """List all database users"""
+ async with self.get_connection() as conn:
+ records = await conn.fetch(
+ "SELECT usename FROM pg_user WHERE usename != 'cloudberry' "
+ "ORDER BY usename"
+ )
+ return [r["usename"] for r in records]
+
+ async def list_user_permissions(self, username: str) -> list[dict]:
+ """List permissions for a specific user"""
+ async with self.get_connection() as conn:
+ records = await conn.fetch(
+ "SELECT "
+ "n.nspname as schema_name, "
+ "c.relname as object_name, "
+ "c.relkind as object_type, "
+ "p.perm as permission "
+ "FROM pg_class c "
+ "JOIN pg_namespace n ON n.oid = c.relnamespace "
+ "CROSS JOIN LATERAL aclexplode(c.relacl) p "
+ "WHERE p.grantee = (SELECT oid FROM pg_user WHERE usename =
$1) "
+ "AND n.nspname NOT LIKE 'pg_%' "
+ "ORDER BY n.nspname, c.relname",
+ username
+ )
+ return [
+ {
+ "schema": r["schema_name"],
+ "object": r["object_name"],
+ "type": r["object_type"],
+ "permission": r["permission"]
+ }
+ for r in records
+ ]
+
+ async def list_table_privileges(self, schema: str, table: str) ->
list[dict]:
+ """List privileges for a specific table"""
+ async with self.get_connection() as conn:
+ records = await conn.fetch(
+ "SELECT "
+ "grantee, privilege_type "
+ "FROM information_schema.table_privileges "
+ "WHERE table_schema = $1 AND table_name = $2 "
+ "ORDER BY grantee, privilege_type",
+ schema, table
+ )
+ return [
+ {
+ "user": r["grantee"],
+ "privilege": r["privilege_type"]
+ }
+ for r in records
+ ]
+
+ async def list_constraints(self, schema: str, table: str) -> list[dict]:
+ """List constraints for a specific table"""
+ async with self.get_connection() as conn:
+ records = await conn.fetch(
+ "SELECT "
+ "c.conname as constraint_name, "
+ "c.contype as constraint_type, "
+ "pg_get_constraintdef(c.oid) as constraint_definition, "
+ "f.relname as foreign_table_name, "
+ "nf.nspname as foreign_schema_name "
+ "FROM pg_constraint c "
+ "JOIN pg_class t ON t.oid = c.conrelid "
+ "JOIN pg_namespace n ON n.oid = t.relnamespace "
+ "LEFT JOIN pg_class f ON f.oid = c.confrelid "
+ "LEFT JOIN pg_namespace nf ON nf.oid = f.relnamespace "
+ "WHERE n.nspname = $1 AND t.relname = $2 "
+ "ORDER BY c.conname",
+ schema, table
+ )
+ constraints = []
+ for r in records:
+ constraint_type = {
+ 'p': 'PRIMARY KEY',
+ 'f': 'FOREIGN KEY',
+ 'u': 'UNIQUE',
+ 'c': 'CHECK',
+ 'x': 'EXCLUSION'
+ }.get(r["constraint_type"], r["constraint_type"])
+
+ constraints.append({
+ "name": r["constraint_name"],
+ "type": constraint_type,
+ "definition": r["constraint_definition"],
+ "foreign_table": r["foreign_table_name"],
+ "foreign_schema": r["foreign_schema_name"]
+ })
+ return constraints
+
+ async def list_foreign_keys(self, schema: str, table: str) -> list[dict]:
+ """List foreign keys for a specific table"""
+ async with self.get_connection() as conn:
+ records = await conn.fetch(
+ "SELECT "
+ "tc.constraint_name, "
+ "tc.table_name, "
+ "kcu.column_name, "
+ "ccu.table_name AS foreign_table_name, "
+ "ccu.column_name AS foreign_column_name, "
+ "ccu.table_schema AS foreign_schema_name "
+ "FROM information_schema.table_constraints AS tc "
+ "JOIN information_schema.key_column_usage AS kcu "
+ "ON tc.constraint_name = kcu.constraint_name "
+ "JOIN information_schema.constraint_column_usage AS ccu "
+ "ON ccu.constraint_name = tc.constraint_name "
+ "WHERE tc.constraint_type = 'FOREIGN KEY' "
+ "AND tc.table_schema = $1 AND tc.table_name = $2 "
+ "ORDER BY tc.constraint_name",
+ schema, table
+ )
+ return [
+ {
+ "constraint_name": r["constraint_name"],
+ "column": r["column_name"],
+ "foreign_schema": r["foreign_schema_name"],
+ "foreign_table": r["foreign_table_name"],
+ "foreign_column": r["foreign_column_name"]
+ }
+ for r in records
+ ]
+
+ async def list_referenced_tables(self, schema: str, table: str) ->
list[dict]:
+ """List tables that reference this table"""
+ async with self.get_connection() as conn:
+ records = await conn.fetch(
+ "SELECT "
+ "tc.table_schema, "
+ "tc.table_name, "
+ "tc.constraint_name "
+ "FROM information_schema.table_constraints AS tc "
+ "JOIN information_schema.constraint_column_usage AS ccu "
+ "ON ccu.constraint_name = tc.constraint_name "
+ "WHERE tc.constraint_type = 'FOREIGN KEY' "
+ "AND ccu.table_schema = $1 AND ccu.table_name = $2 "
+ "ORDER BY tc.table_schema, tc.table_name",
+ schema, table
+ )
+ return [
+ {
+ "schema": r["table_schema"],
+ "table": r["table_name"],
+ "constraint": r["constraint_name"]
+ }
+ for r in records
+ ]
+
+ async def explain_query(self, query: str, params: Optional[dict] = None)
-> str:
+ """Get the execution plan for a query"""
+ try:
+ async with self.get_connection() as conn:
+ if params:
+ result = await conn.fetch(f"EXPLAIN (ANALYZE, BUFFERS)
{query}", **params)
+ else:
+ result = await conn.fetch(f"EXPLAIN (ANALYZE, BUFFERS)
{query}")
+
+ return "\n".join([row["QUERY PLAN"] for row in result])
+ except Exception as e:
+ return f"Error explaining query: {str(e)}"
+
+ async def get_slow_queries(self, limit: int = 10) -> list[dict]:
+ """Get slow queries from pg_stat_statements"""
+ async with self.get_connection() as conn:
+ try:
+ records = await conn.fetch(
+ "SELECT "
+ "query, "
+ "calls, "
+ "total_time, "
+ "mean_time, "
+ "rows "
+ "FROM pg_stat_statements "
+ "ORDER BY mean_time DESC "
+ "LIMIT $1",
+ limit
+ )
+ return [
+ {
+ "query": r["query"],
+ "calls": r["calls"],
+ "total_time": r["total_time"],
+ "mean_time": r["mean_time"],
+ "rows": r["rows"]
+ }
+ for r in records
+ ]
+ except Exception:
+ # pg_stat_statements might not be available
+ return []
+
+ async def get_index_usage(self) -> list[dict]:
+ """Get index usage statistics"""
+ async with self.get_connection() as conn:
+ records = await conn.fetch(
+ "SELECT "
+ "schemaname, "
+ "relname as tablename, "
+ "indexrelname as indexname, "
+ "idx_scan, "
+ "idx_tup_read, "
+ "idx_tup_fetch "
+ "FROM pg_stat_user_indexes "
+ "WHERE schemaname NOT LIKE 'pg_%' "
+ "ORDER BY idx_scan DESC"
+ )
+ return [
+ {
+ "schema": r["schemaname"],
+ "table": r["tablename"],
+ "index": r["indexname"],
+ "scans": r["idx_scan"],
+ "tup_read": r["idx_tup_read"],
+ "tup_fetch": r["idx_tup_fetch"]
+ }
+ for r in records
+ ]
+
+ async def get_table_bloat_info(self) -> list[dict]:
+ """Get table bloat information"""
+ async with self.get_connection() as conn:
+ records = await conn.fetch(
+ "SELECT "
+ "schemaname, "
+ "relname as tablename, "
+
"pg_size_pretty(pg_total_relation_size(schemaname||'.'||relname)) as
total_size, "
+ "round(100 * (relpages - (relpages * fillfactor / 100)) /
relpages, 2) as bloat_ratio "
+ "FROM pg_class c "
+ "JOIN pg_namespace n ON n.oid = c.relnamespace "
+ "JOIN pg_stat_user_tables s ON s.relid = c.oid "
+ "WHERE c.relkind = 'r' AND n.nspname NOT LIKE 'pg_%' "
+ "ORDER BY bloat_ratio DESC "
+ "LIMIT 20"
+ )
+ return [
+ {
+ "schema": r["schemaname"],
+ "table": r["tablename"],
+ "total_size": r["total_size"],
+ "bloat_ratio": r["bloat_ratio"]
+ }
+ for r in records
+ ]
+
+ async def get_database_activity(self) -> list[dict]:
+ """Get current database activity"""
+ async with self.get_connection() as conn:
+ records = await conn.fetch(
+ "SELECT "
+ "pid, "
+ "usename, "
+ "application_name, "
+ "client_addr, "
+ "state, "
+ "query_start, "
+ "query "
+ "FROM pg_stat_activity "
+ "WHERE state != 'idle' AND usename != 'cloudberry' "
+ "ORDER BY query_start"
+ )
+ return [
+ {
+ "pid": r["pid"],
+ "username": r["usename"],
+ "application": r["application_name"],
+ "client_addr": str(r["client_addr"]) if r["client_addr"]
else None,
+ "state": r["state"],
+ "query_start": str(r["query_start"]),
+ "query": r["query"]
+ }
+ for r in records
+ ]
+
+ async def list_functions(self, schema: str) -> list[dict]:
+ """List functions in a specific schema"""
+ async with self.get_connection() as conn:
+ records = await conn.fetch(
+ "SELECT "
+ "proname as function_name, "
+ "pg_get_function_identity_arguments(p.oid) as arguments, "
+ "pg_get_function_result(p.oid) as return_type, "
+ "prokind as function_type "
+ "FROM pg_proc p "
+ "JOIN pg_namespace n ON n.oid = p.pronamespace "
+ "WHERE n.nspname = $1 AND p.prokind IN ('f', 'p') "
+ "ORDER BY proname",
+ schema
+ )
+ return [
+ {
+ "name": r["function_name"],
+ "arguments": r["arguments"],
+ "return_type": r["return_type"],
+ "type": "function" if r["function_type"] == "f" else
"procedure"
+ }
+ for r in records
+ ]
+
+ async def get_function_definition(self, schema: str, function_name: str)
-> str:
+ """Get function definition"""
+ try:
+ async with self.get_connection() as conn:
+ definition = await conn.fetchval(
+ "SELECT pg_get_functiondef(p.oid) "
+ "FROM pg_proc p "
+ "JOIN pg_namespace n ON n.oid = p.pronamespace "
+ "WHERE n.nspname = $1 AND p.proname = $2 "
+ "LIMIT 1",
+ schema, function_name
+ )
+ return definition or "Function definition not found"
+ except Exception as e:
+ return f"Error getting function definition: {str(e)}"
+
+ async def list_triggers(self, schema: str, table: str) -> list[dict]:
+ """List triggers for a specific table"""
+ async with self.get_connection() as conn:
+ records = await conn.fetch(
+ "SELECT "
+ "trigger_name, "
+ "event_manipulation, "
+ "action_timing, "
+ "action_statement "
+ "FROM information_schema.triggers "
+ "WHERE event_object_schema = $1 AND event_object_table = $2 "
+ "ORDER BY trigger_name",
+ schema, table
+ )
+ return [
+ {
+ "name": r["trigger_name"],
+ "event": r["event_manipulation"],
+ "timing": r["action_timing"],
+ "action": r["action_statement"]
+ }
+ for r in records
+ ]
+
+ async def get_table_ddl(self, schema: str, table: str) -> str:
+ """Get DDL statement for a table"""
+ try:
+ async with self.get_connection() as conn:
+ # Try the newer method first
+ try:
+ ddl = await conn.fetchval(
+ "SELECT pg_get_tabledef($1, $2, true)",
+ schema, table
+ )
+ if ddl:
+ return ddl
+ except Exception:
+ pass
+
+ # Fallback to a more compatible approach
+ ddl_query = """
+ SELECT 'CREATE TABLE ' || $1 || '.' || $2 || ' (' || E'\n' ||
+ string_agg(
+ ' ' || column_name || ' ' ||
+ data_type ||
+ CASE WHEN character_maximum_length IS NOT NULL
+ THEN '(' || character_maximum_length || ')'
+ ELSE '' END ||
+ CASE WHEN is_nullable = 'NO' THEN ' NOT NULL' ELSE ''
END ||
+ CASE WHEN column_default IS NOT NULL
+ THEN ' DEFAULT ' || column_default
+ ELSE '' END,
+ E',\n' ORDER BY ordinal_position
+ ) || E'\n);' as ddl
+ FROM information_schema.columns
+ WHERE table_schema = $1 AND table_name = $2
+ """
+ ddl = await conn.fetchval(ddl_query, schema, table)
+ return ddl or "Table DDL not found"
+ except Exception as e:
+ return f"Error getting table DDL: {str(e)}"
+
+ async def list_materialized_views(self, schema: str) -> list[str]:
+ """List materialized views in a specific schema"""
+ async with self.get_connection() as conn:
+ records = await conn.fetch(
+ "SELECT matviewname "
+ "FROM pg_matviews "
+ "WHERE schemaname = $1 "
+ "ORDER BY matviewname",
+ schema
+ )
+ return [r["matviewname"] for r in records]
+
+ async def get_vacuum_info(self, schema: str, table: str) -> dict:
+ """Get vacuum information for a table"""
+ async with self.get_connection() as conn:
+ record = await conn.fetchrow(
+ "SELECT "
+ "last_vacuum, "
+ "last_autovacuum, "
+ "n_dead_tup, "
+ "n_live_tup, "
+ "vacuum_count, "
+ "autovacuum_count "
+ "FROM pg_stat_user_tables "
+ "WHERE schemaname = $1 AND relname = $2",
+ schema, table
+ )
+ if record:
+ return {
+ "last_vacuum": str(record["last_vacuum"]) if
record["last_vacuum"] else None,
+ "last_autovacuum": str(record["last_autovacuum"]) if
record["last_autovacuum"] else None,
+ "dead_tuples": record["n_dead_tup"],
+ "live_tuples": record["n_live_tup"],
+ "vacuum_count": record["vacuum_count"],
+ "autovacuum_count": record["autovacuum_count"]
+ }
+ return {"error": "Table not found"}
+
+ async def list_active_connections(self) -> list[dict]:
+ """List active database connections"""
+ async with self.get_connection() as conn:
+ records = await conn.fetch(
+ "SELECT "
+ "pid, "
+ "usename, "
+ "application_name, "
+ "client_addr, "
+ "state, "
+ "backend_start, "
+ "query_start "
+ "FROM pg_stat_activity "
+ "WHERE usename != 'cloudberry' "
+ "ORDER BY backend_start"
+ )
+ return [
+ {
+ "pid": r["pid"],
+ "username": r["usename"],
+ "application": r["application_name"],
+ "client_addr": str(r["client_addr"]) if r["client_addr"]
else None,
+ "state": r["state"],
+ "backend_start": str(r["backend_start"]),
+ "query_start": str(r["query_start"]) if r["query_start"]
else None
+ }
+ for r in records
+ ]
\ No newline at end of file
diff --git a/mcp-server/src/cbmcp/prompt.py b/mcp-server/src/cbmcp/prompt.py
new file mode 100644
index 00000000000..774aabac6ba
--- /dev/null
+++ b/mcp-server/src/cbmcp/prompt.py
@@ -0,0 +1,62 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Prompt templates for Apache Cloudberry database analysis."""
+
+ANALYZE_QUERY_PERFORMANCE_PROMPT = """Please help me analyze and optimize a
PostgreSQL query.
+
+I'll provide you with:
+1. The SQL query: {sql}
+2. The EXPLAIN ANALYZE output: {explain}
+3. Table schema information: {table_info}
+
+Please analyze:
+- Query execution plan
+- Potential performance bottlenecks
+- Index usage
+- Suggested optimizations
+- Alternative query approaches
+"""
+
+SUGGEST_INDEXES_PROMPT = """Please help me suggest optimal indexes for your
PostgreSQL tables.
+
+I'll provide you with:
+1. The table schema(s): {table_info}
+2. Common query patterns: {query}
+3. Current indexes: {table_info}
+4. Table size and row count: {table_stats}
+
+Please analyze:
+- Missing indexes based on query patterns
+- Index type recommendations (B-tree, GIN, GiST, etc.)
+- Composite index suggestions
+- Index maintenance considerations
+"""
+
+DATABASE_HEALTH_CHECK_PROMPT = """Let's perform a comprehensive health check
of your PostgreSQL database.
+
+Please analyze:
+- Database size and growth trends
+- Large tables and indexes
+- Query performance metrics
+- Connection pool usage
+- Vacuum and analyze statistics
+- Index fragmentation
+- Table bloat
+"""
\ No newline at end of file
diff --git a/mcp-server/src/cbmcp/security.py b/mcp-server/src/cbmcp/security.py
new file mode 100644
index 00000000000..d9cd44a7e17
--- /dev/null
+++ b/mcp-server/src/cbmcp/security.py
@@ -0,0 +1,96 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Security utilities for the Apache Cloudberry MCP server
+"""
+
+from typing import Set
+import re
+
+
+class SQLValidator:
+ """Validates SQL queries for security"""
+
+ # Allowed SQL operations for safety
+ ALLOWED_OPERATIONS: Set[str] = {
+ "SELECT", "WITH", "SHOW", "EXPLAIN", "DESCRIBE", "PRAGMA"
+ }
+
+ # Blocked SQL operations
+ BLOCKED_OPERATIONS: Set[str] = {
+ "INSERT", "UPDATE", "DELETE", "DROP", "CREATE", "ALTER",
+ "TRUNCATE", "GRANT", "REVOKE", "REPLACE"
+ }
+
+ # Sensitive tables that should not be queried
+ SENSITIVE_TABLES: Set[str] = {
+ "pg_user", "pg_shadow", "pg_authid", "pg_passfile",
+ "information_schema.user_privileges"
+ }
+
+ @classmethod
+ def validate_query(cls, query: str) -> tuple[bool, str]:
+ """Validate a SQL query for security
+
+ Returns:
+ tuple: (is_valid, error_message)
+ """
+ query_upper = query.upper().strip()
+
+ # Check for blocked operations
+ for blocked in cls.BLOCKED_OPERATIONS:
+ if re.search(rf"\b{blocked}\b", query_upper):
+ return False, f"Blocked SQL operation: {blocked}"
+
+ # Check if query starts with allowed operation
+ if not any(query_upper.startswith(op) for op in
cls.ALLOWED_OPERATIONS):
+ return False, f"Query must start with one of: {',
'.join(cls.ALLOWED_OPERATIONS)}"
+
+ # Check for sensitive table access
+ for sensitive_table in cls.SENSITIVE_TABLES:
+ if re.search(rf"\b{sensitive_table}\b", query_upper):
+ return False, f"Access to sensitive table not allowed:
{sensitive_table}"
+
+ # Check for potential SQL injection patterns
+ injection_patterns = [
+ r";.*--", # Comments after statements
+ r"/\*.*\*/", # Block comments
+ r"'OR'1'='1", # Basic SQL injection
+ r"'UNION.*SELECT", # Union attacks
+ r"EXEC\s*\(", # Dynamic SQL execution
+ ]
+
+ for pattern in injection_patterns:
+ if re.search(pattern, query_upper):
+ return False, f"Potential SQL injection detected"
+
+ return True, "Query is valid"
+
+ @classmethod
+ def sanitize_parameter_name(cls, param_name: str) -> str:
+ """Sanitize parameter names to prevent injection"""
+ # Remove any non-alphanumeric characters except underscores
+ return re.sub(r"[^a-zA-Z0-9_]", "", param_name)
+
+ @classmethod
+ def is_readonly_query(cls, query: str) -> bool:
+ """Check if a query is read-only"""
+ query_upper = query.upper().strip()
+ return query_upper.startswith(("SELECT", "WITH", "SHOW", "EXPLAIN"))
\ No newline at end of file
diff --git a/mcp-server/src/cbmcp/server.py b/mcp-server/src/cbmcp/server.py
new file mode 100644
index 00000000000..edf0b539545
--- /dev/null
+++ b/mcp-server/src/cbmcp/server.py
@@ -0,0 +1,551 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Apache Cloudberry MCP Server Implementation
+
+A Model Communication Protocol server for Apache Cloudberry database
interaction
+providing resources, tools, and prompts for database management.
+"""
+
+from typing import Annotated, Any, Dict, List, Optional
+import logging
+from fastmcp import FastMCP
+from pydantic import Field
+
+from .config import DatabaseConfig, ServerConfig
+from .database import DatabaseManager
+from .prompt import (
+ ANALYZE_QUERY_PERFORMANCE_PROMPT,
+ SUGGEST_INDEXES_PROMPT,
+ DATABASE_HEALTH_CHECK_PROMPT
+)
+
+logger = logging.getLogger(__name__)
+
+class CloudberryMCPServer:
+ """Apache Cloudberry MCP Server implementation"""
+
+ def __init__(self, server_config: ServerConfig, db_config: DatabaseConfig):
+ self.server_config = server_config
+ self.db_config = db_config
+ self.mcp = FastMCP("Apache Cloudberry MCP Server")
+ self.db_manager = DatabaseManager(db_config)
+
+ self._setup_resources()
+ self._setup_tools()
+ self._setup_prompts()
+
+
+ def _setup_resources(self):
+ """Setup MCP resources for database metadata"""
+
+ @self.mcp.resource("postgres://schemas", mime_type="application/json")
+ async def list_schemas() -> List[str]:
+ """List all database schemas"""
+ logger.info("Listing schemas")
+ return await self.db_manager.list_schemas()
+
+ @self.mcp.resource("postgres://database/info",
mime_type="application/json")
+ async def database_info() -> Dict[str, str]:
+ """Get general database information"""
+ logger.info("Getting database info")
+ return await self.db_manager.get_database_info()
+
+ @self.mcp.resource("postgres://database/summary",
mime_type="application/json")
+ async def database_summary() -> Dict[str, dict]:
+ """Get comprehensive database summary"""
+ logger.info("Getting database summary")
+ return await self.db_manager.get_database_summary()
+
+
+ def _setup_tools(self):
+ """Setup MCP tools for database operations"""
+
+ @self.mcp.tool()
+ async def list_tables(
+ schema: Annotated[str, Field(description="The schema name to list
tables from")]
+ ) -> List[str]:
+ """List tables in a specific schema"""
+ logger.info(f"Listing tables in schema: {schema}")
+ try:
+ return await self.db_manager.list_tables(schema)
+ except Exception as e:
+ return f"Error listing tables: {str(e)}"
+
+ @self.mcp.tool()
+ async def list_views(
+ schema: Annotated[str, Field(description="The schema name to list
views from")]
+ ) -> List[str]:
+ """List views in a specific schema"""
+ logger.info(f"Listing views in schema: {schema}")
+ try:
+ return await self.db_manager.list_views(schema)
+ except Exception as e:
+ return f"Error listing views: {str(e)}"
+
+ @self.mcp.tool()
+ async def list_indexes(
+ schema: Annotated[str, Field(description="The schema name")],
+ table: Annotated[str, Field(description="The table name to list
indexes for")]
+ ) -> List[str]:
+ """List indexes for a specific table"""
+ logger.info(f"Listing indexes for table: {schema}.{table}")
+ try:
+ indexes = await self.db_manager.list_indexes(schema, table)
+ return [f"{idx['indexname']}: {idx['indexdef']}" for idx in
indexes]
+ except Exception as e:
+ return f"Error listing indexes: {str(e)}"
+
+ @self.mcp.tool()
+ async def list_columns(
+ schema: Annotated[str, Field(description="The schema name")],
+ table: Annotated[str, Field(description="The table name to list
columns for")]
+ ) -> List[Dict[str, Any]]:
+ """List columns for a specific table"""
+ logger.info(f"Listing columns for table: {schema}.{table}")
+ try:
+ return await self.db_manager.list_columns(schema, table)
+ except Exception as e:
+ return f"Error listing columns: {str(e)}"
+
+ @self.mcp.tool()
+ async def execute_query(
+ query: Annotated[str, Field(description="The SQL query to
execute")],
+ params: Annotated[Optional[Dict[str, Any]], Field(description="The
parameters for the query")] = None,
+ readonly: Annotated[bool, Field(description="Whether the query is
read-only")] = True
+ ) -> Dict[str, Any]:
+ """
+ Execute a safe SQL query with parameters
+ """
+ logger.info(f"Executing query: {query}")
+ try:
+ return await self.db_manager.execute_query(query, params,
readonly)
+ except Exception as e:
+ return {"error": f"Error executing query: {str(e)}"}
+
+ @self.mcp.tool()
+ async def explain_query(
+ query: Annotated[str, Field(description="The SQL query to
explain")],
+ params: Annotated[Optional[Dict[str, Any]], Field(description="The
parameters for the query")] = None
+ ) -> str:
+ """
+ Get the execution plan for a query
+ """
+ logger.info(f"Explaining query: {query}")
+ try:
+ return await self.db_manager.explain_query(query, params)
+ except Exception as e:
+ return f"Error explaining query: {str(e)}"
+
+ @self.mcp.tool()
+ async def get_table_stats(
+ schema: Annotated[str, Field(description="The schema name")],
+ table: Annotated[str, Field(description="The table name")],
+ ) -> Dict[str, Any]:
+ """
+ Get statistics for a table
+ """
+ logger.info(f"Getting table stats for: {schema}.{table}")
+ try:
+ result = await self.db_manager.get_table_stats(schema, table)
+ if "error" in result:
+ return result["error"]
+ return result
+ except Exception as e:
+ return f"Error getting table stats: {str(e)}"
+
+ @self.mcp.tool()
+ async def list_large_tables(limit: Annotated[int,
Field(description="Number of tables to return")] = 10) -> List[Dict[str, Any]]:
+ """
+ List the largest tables in the database
+ """
+ logger.info(f"Listing large tables, limit: {limit}")
+ try:
+ return await self.db_manager.list_large_tables(limit)
+ except Exception as e:
+ return f"Error listing large tables: {str(e)}"
+
+ @self.mcp.tool()
+ async def get_database_schemas() -> List[str]:
+ """Get database schemas"""
+ logger.info("Getting database schemas")
+ try:
+ return await self.db_manager.list_schemas()
+ except Exception as e:
+ return f"Error getting schemas: {str(e)}"
+
+ @self.mcp.tool()
+ async def get_database_information() -> Dict[str, str]:
+ """Get general database information"""
+ logger.info("Getting database information")
+ try:
+ return await self.db_manager.get_database_info()
+ except Exception as e:
+ return f"Error getting database info: {str(e)}"
+
+ @self.mcp.tool()
+ async def get_database_summary() -> Dict[str, dict]:
+ """Get detailed database summary"""
+ logger.info("Getting database summary")
+ try:
+ return await self.db_manager.get_database_summary()
+ except Exception as e:
+ return f"Error getting database summary: {str(e)}"
+
+ @self.mcp.tool()
+ async def list_users() -> List[str]:
+ """List all database users"""
+ logger.info("Listing database users")
+ try:
+ return await self.db_manager.list_users()
+ except Exception as e:
+ return f"Error listing users: {str(e)}"
+
+ @self.mcp.tool()
+ async def list_user_permissions(
+ username: Annotated[str, Field(description="The username to check
permissions for")]
+ ) -> List[Dict[str, Any]]:
+ """List permissions for a specific user"""
+ logger.info(f"Listing permissions for user: {username}")
+ try:
+ return await self.db_manager.list_user_permissions(username)
+ except Exception as e:
+ return f"Error listing user permissions: {str(e)}"
+
+ @self.mcp.tool()
+ async def list_table_privileges(
+ schema: Annotated[str, Field(description="The schema name")],
+ table: Annotated[str, Field(description="The table name")]
+ ) -> List[Dict[str, Any]]:
+ """List privileges for a specific table"""
+ logger.info(f"Listing table privileges for: {schema}.{table}")
+ try:
+ return await self.db_manager.list_table_privileges(schema,
table)
+ except Exception as e:
+ return f"Error listing table privileges: {str(e)}"
+
+ @self.mcp.tool()
+ async def list_constraints(
+ schema: Annotated[str, Field(description="The schema name")],
+ table: Annotated[str, Field(description="The table name")]
+ ) -> List[Dict[str, Any]]:
+ """List constraints for a specific table"""
+ logger.info(f"Listing constraints for table: {schema}.{table}")
+ try:
+ return await self.db_manager.list_constraints(schema, table)
+ except Exception as e:
+ return f"Error listing constraints: {str(e)}"
+
+ @self.mcp.tool()
+ async def list_foreign_keys(
+ schema: Annotated[str, Field(description="The schema name")],
+ table: Annotated[str, Field(description="The table name")]
+ ) -> List[Dict[str, Any]]:
+ """List foreign keys for a specific table"""
+ logger.info(f"Listing foreign keys for table: {schema}.{table}")
+ try:
+ return await self.db_manager.list_foreign_keys(schema, table)
+ except Exception as e:
+ return f"Error listing foreign keys: {str(e)}"
+
+ @self.mcp.tool()
+ async def list_referenced_tables(
+ schema: Annotated[str, Field(description="The schema name")],
+ table: Annotated[str, Field(description="The table name")]
+ ) -> List[Dict[str, Any]]:
+ """List tables that reference this table"""
+ logger.info(f"Listing referenced tables for: {schema}.{table}")
+ try:
+ return await self.db_manager.list_referenced_tables(schema,
table)
+ except Exception as e:
+ return f"Error listing referenced tables: {str(e)}"
+
+ @self.mcp.tool()
+ async def get_slow_queries(
+ limit: Annotated[int, Field(description="Number of slow queries to
return")] = 10
+ ) -> List[Dict[str, Any]]:
+ """Get slow queries from database statistics"""
+ logger.info(f"Getting slow queries, limit: {limit}")
+ try:
+ return await self.db_manager.get_slow_queries(limit)
+ except Exception as e:
+ return f"Error getting slow queries: {str(e)}"
+
+ @self.mcp.tool()
+ async def get_index_usage() -> List[Dict[str, Any]]:
+ """Get index usage statistics"""
+ logger.info("Getting index usage statistics")
+ try:
+ return await self.db_manager.get_index_usage()
+ except Exception as e:
+ return f"Error getting index usage: {str(e)}"
+
+ @self.mcp.tool()
+ async def get_table_bloat_info() -> List[Dict[str, Any]]:
+ """Get table bloat information"""
+ logger.info("Getting table bloat information")
+ try:
+ return await self.db_manager.get_table_bloat_info()
+ except Exception as e:
+ return f"Error getting table bloat info: {str(e)}"
+
+ @self.mcp.tool()
+ async def get_database_activity() -> List[Dict[str, Any]]:
+ """Get current database activity"""
+ logger.info("Getting database activity")
+ try:
+ return await self.db_manager.get_database_activity()
+ except Exception as e:
+ return f"Error getting database activity: {str(e)}"
+
+ @self.mcp.tool()
+ async def list_functions(
+ schema: Annotated[str, Field(description="The schema name")]
+ ) -> List[Dict[str, Any]]:
+ """List functions in a specific schema"""
+ logger.info(f"Listing functions in schema: {schema}")
+ try:
+ return await self.db_manager.list_functions(schema)
+ except Exception as e:
+ return f"Error listing functions: {str(e)}"
+
+ @self.mcp.tool()
+ async def get_function_definition(
+ schema: Annotated[str, Field(description="The schema name")],
+ function_name: Annotated[str, Field(description="The function
name")]
+ ) -> str:
+ """Get function definition"""
+ logger.info(f"Getting function definition:
{schema}.{function_name}")
+ try:
+ return await self.db_manager.get_function_definition(schema,
function_name)
+ except Exception as e:
+ return f"Error getting function definition: {str(e)}"
+
+ @self.mcp.tool()
+ async def list_triggers(
+ schema: Annotated[str, Field(description="The schema name")],
+ table: Annotated[str, Field(description="The table name")]
+ ) -> List[Dict[str, Any]]:
+ """List triggers for a specific table"""
+ logger.info(f"Listing triggers for table: {schema}.{table}")
+ try:
+ return await self.db_manager.list_triggers(schema, table)
+ except Exception as e:
+ return f"Error listing triggers: {str(e)}"
+
+ @self.mcp.tool()
+ async def get_table_ddl(
+ schema: Annotated[str, Field(description="The schema name")],
+ table: Annotated[str, Field(description="The table name")]
+ ) -> str:
+ """Get DDL statement for a table"""
+ logger.info(f"Getting table DDL: {schema}.{table}")
+ try:
+ return await self.db_manager.get_table_ddl(schema, table)
+ except Exception as e:
+ return f"Error getting table DDL: {str(e)}"
+
+ @self.mcp.tool()
+ async def list_materialized_views(
+ schema: Annotated[str, Field(description="The schema name")]
+ ) -> List[str]:
+ """List materialized views in a specific schema"""
+ logger.info(f"Listing materialized views in schema: {schema}")
+ try:
+ return await self.db_manager.list_materialized_views(schema)
+ except Exception as e:
+ return f"Error listing materialized views: {str(e)}"
+
+ @self.mcp.tool()
+ async def get_vacuum_info(
+ schema: Annotated[str, Field(description="The schema name")],
+ table: Annotated[str, Field(description="The table name")]
+ ) -> Dict[str, Any]:
+ """Get vacuum information for a table"""
+ logger.info(f"Getting vacuum info for table: {schema}.{table}")
+ try:
+ return await self.db_manager.get_vacuum_info(schema, table)
+ except Exception as e:
+ return f"Error getting vacuum info: {str(e)}"
+
+ @self.mcp.tool()
+ async def list_active_connections() -> List[Dict[str, Any]]:
+ """List active database connections"""
+ logger.info("Listing active connections")
+ try:
+ return await self.db_manager.list_active_connections()
+ except Exception as e:
+ return f"Error listing active connections: {str(e)}"
+
+ def _setup_prompts(self):
+ """Setup MCP prompts for common database tasks"""
+
+ @self.mcp.prompt()
+ def analyze_query_performance(
+ sql: Annotated[str, Field(description="The SQL query to analyze")],
+ explain: Annotated[str, Field(description="The EXPLAIN ANALYZE
output")],
+ table_info: Annotated[str, Field(description="The table schema
information")],
+ ) -> str:
+ """Prompt for analyzing query performance"""
+ logger.info(f"Analyzing query performance for: {sql}")
+ return ANALYZE_QUERY_PERFORMANCE_PROMPT.format(
+ sql=sql,
+ explain=explain,
+ table_info=table_info
+ )
+ @self.mcp.prompt()
+ def suggest_indexes(
+ query: Annotated[str, Field(description="The common query
pattern")],
+ table_info: Annotated[str, Field(description="The table schema
information")],
+ table_stats: Annotated[str, Field(description="The table
statistics")],
+ ) -> str:
+ """Prompt for suggesting indexes"""
+ logger.info(f"Suggesting indexes for query: {query}")
+ return SUGGEST_INDEXES_PROMPT.format(
+ query=query,
+ table_info=table_info,
+ table_stats=table_stats
+ )
+
+
+ @self.mcp.prompt()
+ def database_health_check() -> str:
+ """Prompt for database health check"""
+ logger.info(f"Checking database health")
+ return DATABASE_HEALTH_CHECK_PROMPT
+
+ def run(self, mode: str="http"):
+ """Run the MCP server"""
+ if mode == "stdio":
+ return self.mcp.run(
+ transport="stdio",
+ )
+ elif mode == "http":
+ return self.mcp.run(
+ transport="streamable-http",
+ host=self.server_config.host,
+ port=self.server_config.port,
+ path=self.server_config.path,
+ stateless_http=True
+ )
+
+ async def close(self):
+ """Close the server and cleanup resources"""
+ await self.db_manager.close()
+
+
+def main():
+ """Main entry point"""
+ import argparse
+
+ parser = argparse.ArgumentParser(
+ description="Cloudberry MCP Server - Cloudberry database management
tools",
+ formatter_class=argparse.RawDescriptionHelpFormatter,
+ epilog="""
+Examples:
+ %(prog)s --mode stdio
+ %(prog)s --mode http --host 0.0.0.0 --port 8080
+ %(prog)s --mode http --log-level INFO
+ %(prog)s --help
+ """
+ )
+
+ parser.add_argument(
+ "--mode",
+ choices=["stdio", "http"],
+ default="http",
+ help="Server mode: stdio for stdin/stdout communication, http for HTTP
server (default: http)"
+ )
+
+ parser.add_argument(
+ "--host",
+ default=None,
+ help="HTTP server host (default: from CLOUDBERRY_MCP_HOST env var or
127.0.0.1)"
+ )
+
+ parser.add_argument(
+ "--port",
+ type=int,
+ default=None,
+ help="HTTP server port (default: from CLOUDBERRY_MCP_PORT env var or
8080)"
+ )
+
+ parser.add_argument(
+ "--path",
+ default=None,
+ help="HTTP server path (default: from CLOUDBERRY_MCP_PATH env var or
/mcp)"
+ )
+
+ parser.add_argument(
+ "--log-level",
+ choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
+ default="WARNING",
+ help="Logging level (default: WARNING)"
+ )
+
+ parser.add_argument(
+ "--version",
+ action="version",
+ version="Cloudberry MCP Server 1.0.0"
+ )
+
+ args = parser.parse_args()
+
+ # Configure logging
+ log_level = getattr(logging, args.log_level.upper())
+ logging.basicConfig(
+ level=log_level,
+ format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
+ handlers=[
+ logging.StreamHandler()
+ ]
+ )
+
+ # Create configurations
+ server_config = ServerConfig.from_env()
+ db_config = DatabaseConfig.from_env()
+
+ # Override with command line arguments
+ if args.host:
+ server_config.host = args.host
+ if args.port:
+ server_config.port = args.port
+ if args.path:
+ server_config.path = args.path
+
+ server = CloudberryMCPServer(server_config, db_config)
+
+ try:
+ logger.info(f"Starting server in {args.mode} mode...")
+ server.run(args.mode)
+ except KeyboardInterrupt:
+ logger.error("Server stopped by user")
+ except Exception as e:
+ logger.error(f"Server error: {e}")
+ sys.exit(1)
+ finally:
+ import asyncio
+ asyncio.run(server.close())
+
+
+if __name__ == "__main__":
+ import sys
+
+ main()
diff --git a/mcp-server/tests/README.md b/mcp-server/tests/README.md
new file mode 100644
index 00000000000..9fa69976b26
--- /dev/null
+++ b/mcp-server/tests/README.md
@@ -0,0 +1,115 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+# Apache Cloudberry MCP Testing Guide
+
+## Test Structure
+
+This project uses the `pytest` framework for testing, supporting both
asynchronous testing and parameterized testing.
+
+### Test Files
+- `test_cbmcp.py` - Main test file containing all MCP client functionality
tests
+
+### Test Categories
+- **Unit Tests** - Test individual features independently
+- **Integration Tests** - Test overall system functionality
+- **Parameterized Tests** - Test both stdio and http modes simultaneously
+
+## Running Tests
+
+### Install Test Dependencies
+```bash
+pip install -e ".[dev]"
+```
+
+### Run All Tests
+```bash
+pytest tests/
+```
+
+### Run Specific Tests
+```bash
+# Run specific test file
+pytest tests/test_cbmcp.py
+
+# Run specific test class
+pytest tests/test_cbmcp.py::TestCloudberryMCPClient
+
+# Run specific test method
+pytest tests/test_cbmcp.py::TestCloudberryMCPClient::test_list_capabilities
+
+# Run tests for specific mode
+pytest tests/test_cbmcp.py -k "stdio"
+```
+
+### Verbose Output
+```bash
+pytest tests/ -v
+```
+
+### Coverage Testing
+```bash
+pytest tests/ --cov=src.cbmcp --cov-report=html --cov-report=term
+```
+
+## Test Features
+
+### 1. Server Capabilities Tests
+- `test_list_capabilities` - Test tool, resource, and prompt listings
+
+### 2. Resource Tests
+- `test_get_schemas_resource` - Get database schemas
+- `test_get_tables_resource` - Get table listings
+- `test_get_database_info_resource` - Get database information
+- `test_get_database_summary_resource` - Get database summary
+
+### 3. Tool Tests
+- `test_tools` - Parameterized testing of all tool calls
+ - list_tables
+ - list_views
+ - list_columns
+ - list_indexes
+ - execute_query
+ - list_large_tables
+ - get_table_stats
+ - explain_query
+
+### 4. Prompt Tests
+- `test_analyze_query_performance_prompt` - Query performance analysis prompts
+- `test_suggest_indexes_prompt` - Index suggestion prompts
+- `test_database_health_check_prompt` - Database health check prompts
+
+## Test Modes
+
+Tests support two modes:
+- **stdio** - Standard input/output mode
+- **http** - HTTP mode
+
+## Notes
+
+1. Tests will skip inaccessible features (e.g., when database is not connected)
+2. Ensure Apache Cloudberry service is started and configured correctly
+3. Check database connection configuration in .env file
+
+## Using Scripts to Run
+
+You can use the provided script to run tests:
+```bash
+./run_tests.sh
+```
\ No newline at end of file
diff --git a/mcp-server/tests/test_cbmcp.py b/mcp-server/tests/test_cbmcp.py
new file mode 100644
index 00000000000..5560c1d84e6
--- /dev/null
+++ b/mcp-server/tests/test_cbmcp.py
@@ -0,0 +1,338 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pytest
+import pytest_asyncio
+import asyncio
+import json
+from typing import Any
+from pydantic import AnyUrl
+
+from cbmcp.client import CloudberryMCPClient
+
+
+class CustomJSONEncoder(json.JSONEncoder):
+ def default(self, obj: Any) -> Any:
+ if isinstance(obj, AnyUrl):
+ return str(obj)
+ return super().default(obj)
+
+
[email protected]
+def event_loop():
+ """Create event loop for async testing"""
+ loop = asyncio.get_event_loop_policy().new_event_loop()
+ yield loop
+ loop.close()
+
+
+@pytest_asyncio.fixture(params=["stdio", "http"])
+async def client(request):
+ """Create CloudberryMCPClient instance supporting stdio and http modes"""
+ client_instance = await CloudberryMCPClient.create(mode=request.param)
+ yield client_instance
+ await client_instance.close()
+
+
[email protected]
+class TestCloudberryMCPClient:
+ """Apache Cloudberry MCP client test class"""
+
+ async def test_list_capabilities(self, client):
+ """Test server capabilities list"""
+ tools = await client.list_tools()
+ resources = await client.list_resources()
+ prompts = await client.list_prompts()
+
+ assert tools is not None
+ assert resources is not None
+ assert prompts is not None
+
+ assert isinstance(tools, list)
+ assert isinstance(resources, list)
+ assert isinstance(prompts, list)
+
+ async def test_get_schemas_resource(self, client):
+ """Test getting database schemas resource"""
+ try:
+ schemas = await client.get_resource("postgres://schemas")
+ assert schemas is not None
+ assert isinstance(schemas, list)
+ except Exception as e:
+ pytest.skip(f"Skipping test - unable to get schemas: {e}")
+
+ async def test_get_database_info_resource(self, client):
+ """Test getting database info resource"""
+ try:
+ db_infos = await client.get_resource("postgres://database/info")
+ assert db_infos is not None
+ assert isinstance(db_infos, list)
+ except Exception as e:
+ pytest.skip(f"Skipping test - unable to get database info: {e}")
+
+ async def test_get_database_summary_resource(self, client):
+ """Test getting database summary resource"""
+ try:
+ db_summary = await
client.get_resource("postgres://database/summary")
+ assert db_summary is not None
+ assert isinstance(db_summary, list)
+ except Exception as e:
+ pytest.skip(f"Skipping test - unable to get database summary: {e}")
+
+ @pytest.mark.parametrize("tool_name,parameters", [
+ ("list_tables", {"schema": "public"}),
+ ("list_views", {"schema": "public"}),
+ ("list_columns", {"schema": "public", "table": "test"}),
+ ("list_indexes", {"schema": "public", "table": "test"}),
+ ("execute_query", {"query": "SELECT version()", "readonly": True}),
+ ("list_large_tables", {"limit": 5}),
+ ("get_table_stats", {"schema": "public", "table": "film"}),
+ ("explain_query", {"query": "SELECT version()"}),
+ ])
+ async def test_tools(self, client, tool_name, parameters):
+ """Test various tool calls"""
+ try:
+ result = await client.call_tool(tool_name, parameters)
+ assert result is not None
+ assert hasattr(result, 'structured_content')
+ except Exception as e:
+ pytest.skip(f"Skipping test - unable to call tool {tool_name}:
{e}")
+
+ async def test_analyze_query_performance_prompt(self, client):
+ """Test query performance analysis prompt"""
+ try:
+ prompt = await client.get_prompt(
+ "analyze_query_performance",
+ params={
+ "sql": "SELECT * FROM public.test",
+ "explain": "public.test",
+ "table_info": "100 rows, 10 MB"
+ }
+ )
+ assert prompt is not None
+ assert prompt.description is not None
+ assert isinstance(prompt.messages, list)
+ except Exception as e:
+ pytest.skip(f"Skipping test - unable to get
analyze_query_performance prompt: {e}")
+
+ async def test_suggest_indexes_prompt(self, client):
+ """Test index suggestion prompt"""
+ try:
+ prompt = await client.get_prompt(
+ "suggest_indexes",
+ params={
+ "query": "public",
+ "table_info": "public.test",
+ "table_stats": "100 rows, 10 MB"
+ }
+ )
+ assert prompt is not None
+ assert prompt.description is not None
+ assert isinstance(prompt.messages, list)
+ except Exception as e:
+ pytest.skip(f"Skipping test - unable to get suggest_indexes
prompt: {e}")
+
+ async def test_database_health_check_prompt(self, client):
+ """Test database health check prompt"""
+ try:
+ prompt = await client.get_prompt("database_health_check")
+ assert prompt is not None
+ assert prompt.description is not None
+ assert isinstance(prompt.messages, list)
+ except Exception as e:
+ pytest.skip(f"Skipping test - unable to get database_health_check
prompt: {e}")
+
+ # User and permission management tests
+ async def test_list_users(self, client):
+ """Test listing all users"""
+ try:
+ result = await client.call_tool("list_users", {})
+ assert result is not None
+ assert hasattr(result, 'structured_content')
+ except Exception as e:
+ pytest.skip(f"Skipping test - unable to call list_users tool: {e}")
+
+ async def test_list_user_permissions(self, client):
+ """Test listing user permissions"""
+ try:
+ result = await client.call_tool("list_user_permissions",
{"username": "postgres"})
+ assert result is not None
+ assert hasattr(result, 'structured_content')
+ except Exception as e:
+ pytest.skip(f"Skipping test - unable to call list_user_permissions
tool: {e}")
+
+ async def test_list_table_privileges(self, client):
+ """Test listing table privileges"""
+ try:
+ result = await client.call_tool("list_table_privileges",
{"schema": "public", "table": "film"})
+ assert result is not None
+ assert hasattr(result, 'structured_content')
+ except Exception as e:
+ pytest.skip(f"Skipping test - unable to call list_table_privileges
tool: {e}")
+
+ # Constraint and relationship management tests
+ async def test_list_constraints(self, client):
+ """Test listing constraints"""
+ try:
+ result = await client.call_tool("list_constraints", {"schema":
"public", "table": "film"})
+ assert result is not None
+ assert hasattr(result, 'structured_content')
+ except Exception as e:
+ pytest.skip(f"Skipping test - unable to call list_constraints
tool: {e}")
+
+ async def test_list_foreign_keys(self, client):
+ """Test listing foreign keys"""
+ try:
+ result = await client.call_tool("list_foreign_keys", {"schema":
"public", "table": "film"})
+ assert result is not None
+ assert hasattr(result, 'structured_content')
+ except Exception as e:
+ pytest.skip(f"Skipping test - unable to call list_foreign_keys
tool: {e}")
+
+ async def test_list_referenced_tables(self, client):
+ """Test listing referenced tables"""
+ try:
+ result = await client.call_tool("list_referenced_tables",
{"schema": "public", "table": "film"})
+ assert result is not None
+ assert hasattr(result, 'structured_content')
+ except Exception as e:
+ pytest.skip(f"Skipping test - unable to call
list_referenced_tables tool: {e}")
+
+ # Performance monitoring and optimization tests
+ async def test_get_slow_queries(self, client):
+ """Test getting slow queries"""
+ try:
+ result = await client.call_tool("get_slow_queries", {"limit": 5})
+ assert result is not None
+ assert hasattr(result, 'structured_content')
+ except Exception as e:
+ pytest.skip(f"Skipping test - unable to call get_slow_queries
tool: {e}")
+
+ async def test_get_index_usage(self, client):
+ """Test getting index usage"""
+ try:
+ result = await client.call_tool("get_index_usage", {"schema":
"public"})
+ assert result is not None
+ assert hasattr(result, 'structured_content')
+ except Exception as e:
+ pytest.skip(f"Skipping test - unable to call get_index_usage tool:
{e}")
+
+ async def test_get_table_bloat_info(self, client):
+ """Test getting table bloat information"""
+ try:
+ result = await client.call_tool("get_table_bloat_info", {"schema":
"public", "limit": 5})
+ assert result is not None
+ assert hasattr(result, 'structured_content')
+ except Exception as e:
+ pytest.skip(f"Skipping test - unable to call get_table_bloat_info
tool: {e}")
+
+ async def test_get_database_activity(self, client):
+ """Test getting database activity"""
+ try:
+ result = await client.call_tool("get_database_activity", {})
+ assert result is not None
+ assert hasattr(result, 'structured_content')
+ except Exception as e:
+ pytest.skip(f"Skipping test - unable to call get_database_activity
tool: {e}")
+
+ async def test_get_vacuum_info(self, client):
+ """Test getting vacuum information"""
+ try:
+ result = await client.call_tool("get_vacuum_info", {"schema":
"public", "table": "film"})
+ assert result is not None
+ assert hasattr(result, 'structured_content')
+ except Exception as e:
+ pytest.skip(f"Skipping test - unable to call get_vacuum_info tool:
{e}")
+
+ # Database object management tests
+ async def test_list_functions(self, client):
+ """Test listing functions"""
+ try:
+ result = await client.call_tool("list_functions", {"schema":
"public"})
+ assert result is not None
+ assert hasattr(result, 'structured_content')
+ except Exception as e:
+ pytest.skip(f"Skipping test - unable to call list_functions tool:
{e}")
+
+ async def test_get_function_definition(self, client):
+ """Test getting function definition"""
+ try:
+ result = await client.call_tool("get_function_definition",
{"schema": "public", "function_name": "now"})
+ assert result is not None
+ assert hasattr(result, 'structured_content')
+ except Exception as e:
+ pytest.skip(f"Skipping test - unable to call
get_function_definition tool: {e}")
+
+ async def test_list_triggers(self, client):
+ """Test listing triggers"""
+ try:
+ result = await client.call_tool("list_triggers", {"schema":
"public", "table": "film"})
+ assert result is not None
+ assert hasattr(result, 'structured_content')
+ except Exception as e:
+ pytest.skip(f"Skipping test - unable to call list_triggers tool:
{e}")
+
+ async def test_list_materialized_views(self, client):
+ """Test listing materialized views"""
+ try:
+ result = await client.call_tool("list_materialized_views",
{"schema": "public"})
+ assert result is not None
+ assert hasattr(result, 'structured_content')
+ except Exception as e:
+ pytest.skip(f"Skipping test - unable to call
list_materialized_views tool: {e}")
+
+ async def test_get_materialized_view_definition(self, client):
+ """Test getting materialized view definition"""
+ try:
+ result = await
client.call_tool("get_materialized_view_definition", {"schema": "public",
"view_name": "film"})
+ assert result is not None
+ assert hasattr(result, 'structured_content')
+ except Exception as e:
+ pytest.skip(f"Skipping test - unable to call
get_materialized_view_definition tool: {e}")
+
+ async def test_get_table_ddl(self, client):
+ """Test getting table DDL"""
+ try:
+ result = await client.call_tool("get_table_ddl", {"schema":
"public", "table": "film"})
+ assert result is not None
+ assert hasattr(result, 'structured_content')
+ except Exception as e:
+ pytest.skip(f"Skipping test - unable to call get_table_ddl tool:
{e}")
+
+ async def test_list_active_connections(self, client):
+ """Test listing active connections"""
+ try:
+ result = await client.call_tool("list_active_connections", {})
+ assert result is not None
+ assert hasattr(result, 'structured_content')
+ except Exception as e:
+ pytest.skip(f"Skipping test - unable to call
list_active_connections tool: {e}")
+
+
[email protected]
+async def test_client_modes():
+ """Test basic client functionality in different modes"""
+ for mode in ["stdio", "http"]:
+ client = await CloudberryMCPClient.create(mode=mode)
+ try:
+ # Basic connection test
+ tools = await client.list_tools()
+ assert isinstance(tools, list)
+ finally:
+ await client.close()
diff --git a/mcp-server/tests/test_database_tools.py
b/mcp-server/tests/test_database_tools.py
new file mode 100644
index 00000000000..e0fcc644869
--- /dev/null
+++ b/mcp-server/tests/test_database_tools.py
@@ -0,0 +1,303 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Database tools test module
+Tests newly added database management tool functionality
+"""
+import pytest
+import pytest_asyncio
+import asyncio
+import json
+from typing import Any
+from pydantic import AnyUrl
+
+from cbmcp.client import CloudberryMCPClient
+
+
+class CustomJSONEncoder(json.JSONEncoder):
+ def default(self, obj: Any) -> Any:
+ if isinstance(obj, AnyUrl):
+ return str(obj)
+ return super().default(obj)
+
+
[email protected]
+def event_loop():
+ """Create event loop for async testing"""
+ loop = asyncio.get_event_loop_policy().new_event_loop()
+ yield loop
+ loop.close()
+
+
+@pytest_asyncio.fixture(params=["stdio", "http"])
+async def client(request):
+ """Create CloudberryMCPClient instance supporting stdio and http modes"""
+ client_instance = await CloudberryMCPClient.create(mode=request.param)
+ yield client_instance
+ await client_instance.close()
+
+
[email protected]
+class TestDatabaseTools:
+ """Database management tools test class"""
+
+ # User and permission management tests
+ async def test_list_users_basic(self, client):
+ """Test basic user listing functionality"""
+ try:
+ result = await client.call_tool("list_users", {})
+ assert result is not None
+ assert hasattr(result, 'structured_content')
+ except Exception as e:
+ pytest.skip(f"Skipping user list test: {e}")
+
+ async def test_list_user_permissions(self, client):
+ """Test user permissions query"""
+ try:
+ result = await client.call_tool("list_user_permissions",
{"username": "postgres"})
+ assert result is not None
+ assert hasattr(result, 'structured_content')
+ except Exception as e:
+ pytest.skip(f"Skipping user permissions test: {e}")
+
+ async def test_list_table_privileges(self, client):
+ """Test table privileges query"""
+ try:
+ result = await client.call_tool("list_table_privileges", {
+ "schema": "public",
+ "table": "film"
+ })
+ assert result is not None
+ assert hasattr(result, 'structured_content')
+ except Exception as e:
+ pytest.skip(f"Skipping table privileges test: {e}")
+
+ # Constraint and relationship management tests
+ async def test_list_constraints(self, client):
+ """Test constraints query"""
+ try:
+ result = await client.call_tool("list_constraints", {
+ "schema": "public",
+ "table": "film"
+ })
+ assert result is not None
+ assert hasattr(result, 'structured_content')
+ except Exception as e:
+ pytest.skip(f"Skipping constraints query test: {e}")
+
+ async def test_list_foreign_keys(self, client):
+ """Test foreign keys query"""
+ try:
+ result = await client.call_tool("list_foreign_keys", {
+ "schema": "public",
+ "table": "film"
+ })
+ assert result is not None
+ assert hasattr(result, 'structured_content')
+ except Exception as e:
+ pytest.skip(f"Skipping foreign keys query test: {e}")
+
+ async def test_list_referenced_tables(self, client):
+ """Test referenced tables query"""
+ try:
+ result = await client.call_tool("list_referenced_tables", {
+ "schema": "public",
+ "table": "film"
+ })
+ assert result is not None
+ assert hasattr(result, 'structured_content')
+ except Exception as e:
+ pytest.skip(f"Skipping referenced tables query test: {e}")
+
+ # Performance monitoring and optimization tests
+ async def test_get_slow_queries(self, client):
+ """Test slow queries retrieval"""
+ result = await client.call_tool("get_slow_queries", {"limit": 5})
+ assert result is not None
+ assert hasattr(result, 'structured_content')
+
+ async def test_get_index_usage(self, client):
+ """Test index usage"""
+ try:
+ result = await client.call_tool("get_index_usage", {"schema":
"public"})
+ assert result is not None
+ assert hasattr(result, 'structured_content')
+ except Exception as e:
+ pytest.skip(f"Skipping index usage test: {e}")
+
+ async def test_get_table_bloat_info(self, client):
+ """Test table bloat information"""
+ try:
+ result = await client.call_tool("get_table_bloat_info", {
+ "schema": "public",
+ "limit": 5
+ })
+ assert result is not None
+ assert hasattr(result, 'structured_content')
+ except Exception as e:
+ pytest.skip(f"Skipping table bloat info test: {e}")
+
+ async def test_get_database_activity(self, client):
+ """Test database activity monitoring"""
+ try:
+ result = await client.call_tool("get_database_activity", {})
+ assert result is not None
+ assert hasattr(result, 'structured_content')
+ except Exception as e:
+ pytest.skip(f"Skipping database activity test: {e}")
+
+ async def test_get_vacuum_info(self, client):
+ """Test vacuum information"""
+ try:
+ result = await client.call_tool("get_vacuum_info", {
+ "schema": "public",
+ "table": "film"
+ })
+ assert result is not None
+ assert hasattr(result, 'structured_content')
+ except Exception as e:
+ pytest.skip(f"Skipping vacuum info test: {e}")
+
+ # Database object management tests
+ async def test_list_functions(self, client):
+ """Test functions list"""
+ result = await client.call_tool("list_functions", {"schema": "public"})
+ assert result is not None
+ assert hasattr(result, 'structured_content')
+
+ async def test_get_function_definition(self, client):
+ """Test function definition retrieval"""
+ try:
+ result = await client.call_tool("get_function_definition", {
+ "schema": "public",
+ "function_name": "now"
+ })
+ assert result is not None
+ assert hasattr(result, 'structured_content')
+ except Exception as e:
+ pytest.skip(f"Skipping function definition test: {e}")
+
+ async def test_list_triggers(self, client):
+ """Test triggers list"""
+ try:
+ result = await client.call_tool("list_triggers", {
+ "schema": "public",
+ "table": "film"
+ })
+ assert result is not None
+ assert hasattr(result, 'structured_content')
+ except Exception as e:
+ pytest.skip(f"Skipping triggers list test: {e}")
+
+ async def test_list_materialized_views(self, client):
+ """Test materialized views list"""
+ result = await client.call_tool("list_materialized_views", {"schema":
"public"})
+ assert result is not None
+ assert hasattr(result, 'structured_content')
+
+ async def test_get_materialized_view_ddl(self, client):
+ """Test materialized view DDL"""
+ try:
+ result = await client.call_tool("get_materialized_view_ddl", {
+ "schema": "public",
+ "view_name": "film"
+ })
+ assert result is not None
+ assert hasattr(result, 'structured_content')
+ except Exception as e:
+ pytest.skip(f"Skipping materialized view DDL test: {e}")
+
+ async def test_get_table_ddl(self, client):
+ """Test table DDL retrieval"""
+ try:
+ result = await client.call_tool("get_table_ddl", {
+ "schema": "public",
+ "table": "film"
+ })
+ assert result is not None
+ assert hasattr(result, 'structured_content')
+ except Exception as e:
+ pytest.skip(f"Skipping table DDL test: {e}")
+
+ async def test_list_active_connections(self, client):
+ """Test active connections list"""
+ try:
+ result = await client.call_tool("list_active_connections", {})
+ assert result is not None
+ assert hasattr(result, 'structured_content')
+ except Exception as e:
+ pytest.skip(f"Skipping active connections test: {e}")
+
+ async def test_all_tools_availability(self, client):
+ """Test availability of all newly added tools"""
+ tools = await client.list_tools()
+ tool_names = [tool.name for tool in tools]
+
+ new_tools = [
+ "list_users", "list_user_permissions", "list_table_privileges",
+ "list_constraints", "list_foreign_keys", "list_referenced_tables",
+ "get_slow_queries", "get_index_usage", "get_table_bloat_info",
+ "get_database_activity", "get_vacuum_info", "list_functions",
+ "get_function_definition", "list_triggers",
"list_materialized_views",
+ "get_materialized_view_ddl", "get_table_ddl",
"list_active_connections"
+ ]
+
+ available_tools = [tool for tool in new_tools if tool in tool_names]
+ print(f"Found {len(available_tools)} new tools: {available_tools}")
+
+ async def test_tool_parameter_validation(self, client):
+ """Test tool parameter validation"""
+ try:
+ await client.call_tool("get_table_ddl", {"schema": "public"})
+ except Exception:
+ pass
+
+ try:
+ result = await client.call_tool("get_table_ddl", {
+ "schema": "public",
+ "table": "film"
+ })
+ assert result is not None
+ except Exception as e:
+ pytest.skip(f"Skipping DDL test: {e}")
+
+
[email protected]
+async def test_database_tools_comprehensive():
+ """Comprehensive test of all database tools"""
+ client = await CloudberryMCPClient.create()
+ try:
+ tools = await client.list_tools()
+ assert isinstance(tools, list)
+
+ try:
+ users = await client.call_tool("list_users", {})
+ assert users is not None
+
+ activity = await client.call_tool("get_database_activity", {})
+ assert activity is not None
+
+ connections = await client.call_tool("list_active_connections", {})
+ assert connections is not None
+
+ except Exception as e:
+ pytest.skip(f"Skipping comprehensive test: {e}")
+ finally:
+ await client.close()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]