This is an automated email from the ASF dual-hosted git repository. critas pushed a commit to branch wx_0616 in repository https://gitbox.apache.org/repos/asf/iotdb-mcp-server.git
commit 69ded10a3745009c3c5e1518f62e4706a0a052b9 Author: CritasWang <[email protected]> AuthorDate: Mon Jun 16 11:17:05 2025 +0800 add export tools --- README.md | 81 +++-- src/iotdb_mcp_server/config.py | 15 +- src/iotdb_mcp_server/server.py | 782 ++++++++++++++++++++++++++--------------- 3 files changed, 570 insertions(+), 308 deletions(-) diff --git a/README.md b/README.md index 457021f..db5fd54 100644 --- a/README.md +++ b/README.md @@ -3,56 +3,78 @@ [](https://smithery.ai/server/@apache/iotdb-mcp-server) ## Overview + A Model Context Protocol (MCP) server implementation that provides database interaction and business intelligence capabilities through IoTDB. This server enables running SQL queries. ## Components ### Resources + The server doesn't expose any resources. ### Prompts + The server doesn't provide any prompts. ### Tools + The server offers different tools for IoTDB Tree Model and Table Model. You can choose between them by setting the "IOTDB_SQL_DIALECT" configuration to either "tree" or "table". #### Tree Model + - `metadata_query` - - Execute SHOW/COUNT queries to read metadata from the database - - Input: - - `query_sql` (string): The SHOW/COUNT SQL query to execute - - Returns: Query results as array of objects + - Execute SHOW/COUNT queries to read metadata from the database + - Input: + - `query_sql` (string): The SHOW/COUNT SQL query to execute + - Returns: Query results as array of objects - `select_query` - - Execute SELECT queries to read data from the database - - Input: - - `query_sql` (string): The SELECT SQL query to execute - - Returns: Query results as array of objects + - Execute SELECT queries to read data from the database + - Input: + - `query_sql` (string): The SELECT SQL query to execute + - Returns: Query results as array of objects +- `export_query` + - Execute a query and export the results to a CSV or Excel file + - Input: + - `query_sql` (string): The SQL query to execute (using TREE dialect) + - `format` (string): Export format, either "csv" or "excel" (default: "csv") + - Returns: Information about the exported file and a preview of the data #### Table Model ##### Query Tools + - `read_query` - - Execute SELECT queries to read data from the database - - Input: - - `query` (string): The SELECT SQL query to execute - - Returns: Query results as array of objects + - Execute SELECT queries to read data from the database + - Input: + - `query` (string): The SELECT SQL query to execute + - Returns: Query results as array of objects ##### Schema Tools + - `list_tables` - - Get a list of all tables in the database - - No input required - - Returns: Array of table names -- `describe-table` - - View schema information for a specific table - - Input: - - `table_name` (string): Name of table to describe - - Returns: Array of column definitions with names and types + - Get a list of all tables in the database + - No input required + - Returns: Array of table names + +- `describe_table` + - View schema information for a specific table + - Input: + - `table_name` (string): Name of table to describe + - Returns: Array of column definitions with names and types + +- `export_table_query` + - Execute a query and export the results to a CSV or Excel file + - Input: + - `query_sql` (string): The SQL query to execute (using TABLE dialect) + - `format` (string): Export format, either "csv" or "excel" (default: "csv") + - Returns: Information about the exported file and a preview of the data ## Claude Desktop Integration ## Prerequisites + - Python with `uv` package manager - IoTDB installation - MCP server dependencies @@ -72,8 +94,6 @@ source venv/bin/activate # or `venv\Scripts\activate` on Windows uv sync ``` - - Configure the MCP server in Claude Desktop's configuration file: #### MacOS @@ -86,6 +106,20 @@ Location: `%APPDATA%/Claude/claude_desktop_config.json` **You may need to put the full path to the uv executable in the command field. You can get this by running `which uv` on MacOS/Linux or `where uv` on Windows.** +### Configuration Options + +The server can be configured using the following environment variables or command-line arguments: + +| Option | Environment Variable | Default Value | Description | +| ----------- | -------------------- | ------------- | -------------------------------------------- | +| Host | `IOTDB_HOST` | 192.168.99.19 | IoTDB server host | +| Port | `IOTDB_PORT` | 6667 | IoTDB server port | +| User | `IOTDB_USER` | root | IoTDB username | +| Password | `IOTDB_PASSWORD` | root | IoTDB password | +| Database | `IOTDB_DATABASE` | czxk | IoTDB database name | +| SQL Dialect | `IOTDB_SQL_DIALECT` | table | SQL dialect to use (tree or table) | +| Export Path | `IOTDB_EXPORT_PATH` | /tmp | Path to directory for exported query results | + ```json { "mcpServers": { @@ -103,7 +137,8 @@ Location: `%APPDATA%/Claude/claude_desktop_config.json` "IOTDB_USER": "root", "IOTDB_PASSWORD": "root", "IOTDB_DATABASE": "test", - "IOTDB_SQL_DIALECT": "table" + "IOTDB_SQL_DIALECT": "table", + "IOTDB_EXPORT_PATH": "/path/to/export/folder" } } } diff --git a/src/iotdb_mcp_server/config.py b/src/iotdb_mcp_server/config.py index c571e2b..26ee527 100644 --- a/src/iotdb_mcp_server/config.py +++ b/src/iotdb_mcp_server/config.py @@ -56,6 +56,11 @@ class Config: """ SQL dialect: tree or table """ + + export_path: str + """ + Path for exporting query results + """ @staticmethod def from_env_arguments() -> "Config": @@ -91,7 +96,7 @@ class Config: help="IoTDB password", default=os.getenv("IOTDB_PASSWORD", "root"), ) - + parser.add_argument( "--database", type=str, @@ -105,6 +110,13 @@ class Config: help="SQL dialect: tree or table", default=os.getenv("IOTDB_SQL_DIALECT", "table"), ) + + parser.add_argument( + "--export-path", + type=str, + help="Path for exporting query results", + default=os.getenv("IOTDB_EXPORT_PATH", "/tmp"), + ) args = parser.parse_args() return Config( @@ -114,4 +126,5 @@ class Config: password=args.password, database=args.database, sql_dialect=args.sql_dialect, + export_path=args.export_path, ) diff --git a/src/iotdb_mcp_server/server.py b/src/iotdb_mcp_server/server.py index 53e1fb4..a7bbcfe 100644 --- a/src/iotdb_mcp_server/server.py +++ b/src/iotdb_mcp_server/server.py @@ -1,284 +1,498 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -import logging -import datetime - -from iotdb.Session import Session -from iotdb.SessionPool import SessionPool, PoolConfig -from iotdb.utils.SessionDataSet import SessionDataSet -from iotdb.table_session import TableSession -from iotdb.table_session_pool import TableSessionPool, TableSessionPoolConfig -from mcp.server.fastmcp import FastMCP -from mcp.types import ( - TextContent, -) - -from iotdb_mcp_server.config import Config - -# Initialize FastMCP server -mcp = FastMCP("iotdb_mcp_server") - -# Configure logging -logging.basicConfig( - level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" -) - -logger = logging.getLogger("iotdb_mcp_server") - -config = Config.from_env_arguments() - -db_config = { - "host": config.host, - "port": config.port, - "user": config.user, - "password": config.password, - "database": config.database, - "sql_dialect": config.sql_dialect, -} - -logger.info(f"IoTDB Config: {db_config}") - -if config.sql_dialect == "tree": - - pool_config = PoolConfig( - node_urls=[str(config.host) + ":" + str(config.port)], - user_name=config.user, - password=config.password, - fetch_size=1024, - time_zone="UTC+8", - max_retry=3 - ) - max_pool_size = 5 - wait_timeout_in_ms = 3000 - session_pool = SessionPool(pool_config, max_pool_size, wait_timeout_in_ms) - - @mcp.tool() - async def metadata_query(query_sql: str) -> list[TextContent]: - """Execute metadata queries on IoTDB to explore database structure and statistics. - - Args: - query_sql: The metadata query to execute. Supported queries: - - SHOW DATABASES [path]: List all databases or databases under a specific path - - SHOW TIMESERIES [path]: List all time series or time series under a specific path - - SHOW CHILD PATHS [path]: List child paths under a specific path - - SHOW CHILD NODES [path]: List child nodes under a specific path - - SHOW DEVICES [path]: List all devices or devices under a specific path - - COUNT TIMESERIES [path]: Count time series under a specific path - - COUNT NODES [path]: Count nodes under a specific path - - COUNT DEVICES [path]: Count devices under a specific path - - if path is not provided, the query will be applied to root.** - - Examples: - SHOW DATABASES root.** - SHOW TIMESERIES root.ln.** - SHOW CHILD PATHS root.ln - SHOW CHILD PATHS root.ln.*.* - SHOW CHILD NODES root.ln - SHOW DEVICES root.ln.** - COUNT TIMESERIES root.ln.** - COUNT NODES root.ln - COUNT DEVICES root.ln - """ - session = session_pool.get_session() - try: - stmt = query_sql.strip().upper() - - # 处理SHOW DATABASES - if ( - stmt.startswith("SHOW DATABASES") - or stmt.startswith("SHOW TIMESERIES") - or stmt.startswith("SHOW CHILD PATHS") - or stmt.startswith("SHOW CHILD NODES") - or stmt.startswith("SHOW DEVICES") - or stmt.startswith("COUNT TIMESERIES") - or stmt.startswith("COUNT NODES") - or stmt.startswith("COUNT DEVICES") - ): - res = session.execute_query_statement(query_sql) - return prepare_res(res, session) - else: - raise ValueError("Unsupported metadata query. Please use one of the supported query types.") - - except Exception as e: - session.close() - raise e - - @mcp.tool() - async def select_query(query_sql: str) -> list[TextContent]: - """Execute a SELECT query on the IoTDB tree SQL dialect. - - Args: - query_sql: The SQL query to execute (using TREE dialect) - - SQL Syntax: - SELECT [LAST] selectExpr [, selectExpr] ... - [INTO intoItem [, intoItem] ...] - FROM prefixPath [, prefixPath] ... - [WHERE whereCondition] - [GROUP BY { - ([startTime, endTime), interval [, slidingStep]) | - LEVEL = levelNum [, levelNum] ... | - TAGS(tagKey [, tagKey] ... | - VARIATION(expression[,delta][,ignoreNull=true/false]) | - CONDITION(expression,[keep>/>=/=/</<=]threshold[,ignoreNull=true/false]) | - SESSION(timeInterval) | - COUNT(expression, size[,ignoreNull=true/false]) - }] - [HAVING havingCondition] - [ORDER BY sortKey {ASC | DESC}] - [FILL ({PREVIOUS | LINEAR | constant}) (, interval=DURATION_LITERAL)?)] - [SLIMIT seriesLimit] [SOFFSET seriesOffset] - [LIMIT rowLimit] [OFFSET rowOffset] - [ALIGN BY {TIME | DEVICE}] - - Examples: - select temperature from root.ln.wf01.wt01 where time < 2017-11-01T00:08:00.000 - select status, temperature from root.ln.wf01.wt01 where (time > 2017-11-01T00:05:00.000 and time < 2017-11-01T00:12:00.000) or (time >= 2017-11-01T16:35:00.000 and time <= 2017-11-01T16:37:00.000) - select * from root.ln.** where time > 1 order by time desc limit 10; - - Supported Aggregate Functions: - SUM - COUNT - MAX_VALUE - MIN_VALUE - AVG - VARIANCE - MAX_TIME - MIN_TIME - ... - """ - session = session_pool.get_session() - res = session.execute_query_statement(query_sql) - - stmt = query_sql.strip().upper() - # Regular SELECT queries - if ( - stmt.startswith("SELECT") - ): - return prepare_res(res, session) - # Non-SELECT queries - else: - raise ValueError("Only SELECT queries are allowed for read_query") - - def prepare_res( - _res: SessionDataSet, _session: Session - ) -> list[TextContent]: - columns = _res.get_column_names() - result = [] - while _res.has_next(): - record = _res.next() - if columns[0] == "Time": - timestamp = record.get_timestamp() - # formatted_time = datetime.datetime.fromtimestamp(timestamp/1000).strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] - row = record.get_fields() - result.append(str(timestamp) + "," + ",".join(map(str, row))) - else: - row = record.get_fields() - result.append(",".join(map(str, row))) - _session.close() - return [ - TextContent( - type="text", - text="\n".join([",".join(columns)] + result), - ) - ] - -elif config.sql_dialect == "table": - - session_pool_config = TableSessionPoolConfig( - node_urls=[str(config.host) + ":" + str(config.port)], - username=config.user, - password=config.password, - database=None if len(config.database) == 0 else config.database, - ) - session_pool = TableSessionPool(session_pool_config) - - @mcp.tool() - async def read_query(query_sql: str) -> list[TextContent]: - """Execute a SELECT query on the IoTDB. Please use table sql_dialect when generating SQL queries. - - Args: - query_sql: The SQL query to execute (using TABLE dialect) - """ - table_session = session_pool.get_session() - res = table_session.execute_query_statement(query_sql) - - stmt = query_sql.strip().upper() - # Regular SELECT queries - if ( - stmt.startswith("SELECT") - or stmt.startswith("DESCRIBE") - or stmt.startswith("SHOW") - ): - return prepare_res(res, table_session) - # Non-SELECT queries - else: - raise ValueError("Only SELECT queries are allowed for read_query") - - - @mcp.tool() - async def list_tables() -> list[TextContent]: - """List all tables in the IoTDB database.""" - table_session = session_pool.get_session() - res = table_session.execute_query_statement("SHOW TABLES") - - result = ["Tables_in_" + db_config["database"]] # Header - while res.has_next(): - result.append(str(res.next().get_fields()[0])) - table_session.close() - return [TextContent(type="text", text="\n".join(result))] - - - @mcp.tool() - async def describe_table(table_name: str) -> list[TextContent]: - """Get the schema information for a specific table - Args: - table_name: name of the table to describe - """ - table_session = session_pool.get_session() - res = table_session.execute_query_statement("DESC " + table_name) - - return prepare_res(res, table_session) - - - def prepare_res( - _res: SessionDataSet, _table_session: TableSession - ) -> list[TextContent]: - columns = _res.get_column_names() - result = [] - while _res.has_next(): - row = _res.next().get_fields() - result.append(",".join(map(str, row))) - _table_session.close() - return [ - TextContent( - type="text", - text="\n".join([",".join(columns)] + result), - ) - ] - - -def main(): - logger.info("iotdb_mcp_server running with stdio transport") - # Initialize and run the server - mcp.run(transport="stdio") - - -if __name__ == "__main__": - main() +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import logging +import datetime +import asyncio +import os +import uuid +import pandas as pd +from typing import Dict, Any, List, Union + +from iotdb.Session import Session +from iotdb.SessionPool import SessionPool, PoolConfig +from iotdb.utils.SessionDataSet import SessionDataSet +from iotdb.table_session import TableSession +from iotdb.table_session_pool import TableSessionPool, TableSessionPoolConfig +from mcp.server.fastmcp import FastMCP +from mcp.types import ( + TextContent, +) + +from iotdb_mcp_server.config import Config + +# Initialize FastMCP server +mcp = FastMCP("iotdb_mcp_server") + +# Configure logging +logging.basicConfig( + level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" +) + +logger = logging.getLogger("iotdb_mcp_server") + +config = Config.from_env_arguments() + +db_config = { + "host": config.host, + "port": config.port, + "user": config.user, + "password": config.password, + "database": config.database, + "sql_dialect": config.sql_dialect, + "export_path": config.export_path, +} + +max_pool_size = 100 # Increased from 100 for better concurrency + +logger.info(f"IoTDB Config: {db_config}") + +# Ensure export directory exists +if not os.path.exists(config.export_path): + try: + os.makedirs(config.export_path) + logger.info(f"Created export directory: {config.export_path}") + except Exception as e: + logger.warning(f"Failed to create export directory {config.export_path}: {str(e)}") + +if config.sql_dialect == "tree": + + # Configure connection pool with optimized settings + pool_config = PoolConfig( + node_urls=[str(config.host) + ":" + str(config.port)], + user_name=config.user, + password=config.password, + fetch_size=1024, # Fetch size for queries + time_zone="UTC+8", # Consistent timezone + max_retry=3 # Connection retry attempts + ) + # Optimize pool size based on expected concurrent queries + wait_timeout_in_ms = 5000 # Increased from 3000 for better reliability + session_pool = SessionPool(pool_config, max_pool_size, wait_timeout_in_ms) + + @mcp.tool() + async def metadata_query(query_sql: str) -> list[TextContent]: + """Execute metadata queries on IoTDB to explore database structure and statistics. + + Args: + query_sql: The metadata query to execute. Supported queries: + - SHOW DATABASES [path]: List all databases or databases under a specific path + - SHOW TIMESERIES [path]: List all time series or time series under a specific path + - SHOW CHILD PATHS [path]: List child paths under a specific path + - SHOW CHILD NODES [path]: List child nodes under a specific path + - SHOW DEVICES [path]: List all devices or devices under a specific path + - COUNT TIMESERIES [path]: Count time series under a specific path + - COUNT NODES [path]: Count nodes under a specific path + - COUNT DEVICES [path]: Count devices under a specific path + - if path is not provided, the query will be applied to root.** + + Examples: + SHOW DATABASES root.** + SHOW TIMESERIES root.ln.** + SHOW CHILD PATHS root.ln + SHOW CHILD PATHS root.ln.*.* + SHOW CHILD NODES root.ln + SHOW DEVICES root.ln.** + COUNT TIMESERIES root.ln.** + COUNT NODES root.ln + COUNT DEVICES root.ln + """ + session = None + try: + session = session_pool.get_session() + stmt = query_sql.strip().upper() + + # Process SHOW DATABASES + if ( + stmt.startswith("SHOW DATABASES") + or stmt.startswith("SHOW TIMESERIES") + or stmt.startswith("SHOW CHILD PATHS") + or stmt.startswith("SHOW CHILD NODES") + or stmt.startswith("SHOW DEVICES") + or stmt.startswith("COUNT TIMESERIES") + or stmt.startswith("COUNT NODES") + or stmt.startswith("COUNT DEVICES") + ): + res = session.execute_query_statement(query_sql) + return prepare_res(res, session) + else: + session.close() + raise ValueError("Unsupported metadata query. Please use one of the supported query types.") + except Exception as e: + if session: + session.close() + logger.error(f"Failed to execute metadata query: {str(e)}") + raise + + @mcp.tool() + async def select_query(query_sql: str) -> list[TextContent]: + """Execute a SELECT query on the IoTDB tree SQL dialect. + + Args: + query_sql: The SQL query to execute (using TREE dialect) + + SQL Syntax: + SELECT [LAST] selectExpr [, selectExpr] ... + [INTO intoItem [, intoItem] ...] + FROM prefixPath [, prefixPath] ... + [WHERE whereCondition] + [GROUP BY { + ([startTime, endTime), interval [, slidingStep]) | + LEVEL = levelNum [, levelNum] ... | + TAGS(tagKey [, tagKey] ... | + VARIATION(expression[,delta][,ignoreNull=true/false]) | + CONDITION(expression,[keep>/>=/=/</<=]threshold[,ignoreNull=true/false]) | + SESSION(timeInterval) | + COUNT(expression, size[,ignoreNull=true/false]) + }] + [HAVING havingCondition] + [ORDER BY sortKey {ASC | DESC}] + [FILL ({PREVIOUS | LINEAR | constant}) (, interval=DURATION_LITERAL)?)] + [SLIMIT seriesLimit] [SOFFSET seriesOffset] + [LIMIT rowLimit] [OFFSET rowOffset] + [ALIGN BY {TIME | DEVICE}] + + Examples: + select temperature from root.ln.wf01.wt01 where time < 2017-11-01T00:08:00.000 + select status, temperature from root.ln.wf01.wt01 where (time > 2017-11-01T00:05:00.000 and time < 2017-11-01T00:12:00.000) or (time >= 2017-11-01T16:35:00.000 and time <= 2017-11-01T16:37:00.000) + select * from root.ln.** where time > 1 order by time desc limit 10; + + Supported Aggregate Functions: + SUM + COUNT + MAX_VALUE + MIN_VALUE + AVG + VARIANCE + MAX_TIME + MIN_TIME + ... + """ + session = None + try: + session = session_pool.get_session() + stmt = query_sql.strip().upper() + + # Regular SELECT queries + if stmt.startswith("SELECT"): + res = session.execute_query_statement(query_sql) + return prepare_res(res, session) + else: + session.close() + raise ValueError("Only SELECT queries are allowed for select_query") + except Exception as e: + if session: + session.close() + logger.error(f"Failed to execute select query: {str(e)}") + raise + + @mcp.tool() + async def export_query(query_sql: str, format: str = "csv") -> list[TextContent]: + """Execute a query and export the results to a CSV or Excel file. + + Args: + query_sql: The SQL query to execute (using TREE dialect) + format: Export format, either "csv" or "excel" (default: "csv") + + SQL Syntax: + SELECT ⟨select_list⟩ + FROM ⟨tables⟩ + [WHERE ⟨condition⟩] + [GROUP BY ⟨groups⟩] + [HAVING ⟨group_filter⟩] + [FILL ⟨fill_methods⟩] + [ORDER BY ⟨order_expression⟩] + [OFFSET ⟨n⟩] + [LIMIT ⟨n⟩]; + + Returns: + Information about the exported file and a preview of the data (max 10 rows) + """ + session = None + try: + session = session_pool.get_session() + stmt = query_sql.strip().upper() + + if stmt.startswith("SELECT") or stmt.startswith("SHOW"): + # Execute the query + res = session.execute_query_statement(query_sql) + + # Get column names + columns = res.get_column_names() + + # Collect all rows + rows = [] + while res.has_next(): + record = res.next() + if columns[0] == "Time": + timestamp = record.get_timestamp() + row = record.get_fields() + rows.append([timestamp] + row) + else: + rows.append(record.get_fields()) + + # Close the session + session.close() + + # Create a pandas DataFrame + df = pd.DataFrame(rows, columns=columns) + + # Generate unique filename with timestamp + timestamp = int(datetime.datetime.now().timestamp()) + filename = f"dump_{uuid.uuid4().hex[:4]}_{timestamp}" + filepath = "" + + if format.lower() == "csv": + filepath = f"{config.export_path}/{filename}.csv" + df.to_csv(filepath, index=False) + elif format.lower() == "excel": + filepath = f"{config.export_path}/{filename}.xlsx" + df.to_excel(filepath, index=False) + else: + raise ValueError("Format must be either 'csv' or 'excel'") + + # Generate preview (first 10 rows) + preview_rows = min(10, len(rows)) + preview_data = [] + preview_data.append(",".join(columns)) # Header + + for i in range(preview_rows): + preview_data.append(",".join(map(str, rows[i]))) + + # Return information + return [ + TextContent( + type="text", + text=f"Query results exported to {filepath}\n\nPreview (first {preview_rows} rows):\n" + "\n".join(preview_data) + ) + ] + else: + raise ValueError("Only SELECT or SHOW queries are allowed for export") + except Exception as e: + if session: + session.close() + logger.error(f"Failed to export query: {str(e)}") + raise + + def prepare_res( + _res: SessionDataSet, _session: Session + ) -> list[TextContent]: + columns = _res.get_column_names() + result = [] + while _res.has_next(): + record = _res.next() + if columns[0] == "Time": + timestamp = record.get_timestamp() + # formatted_time = datetime.datetime.fromtimestamp(timestamp/1000).strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + row = record.get_fields() + result.append(str(timestamp) + "," + ",".join(map(str, row))) + else: + row = record.get_fields() + result.append(",".join(map(str, row))) + _session.close() + return [ + TextContent( + type="text", + text="\n".join([",".join(columns)] + result), + ) + ] + +elif config.sql_dialect == "table": + + session_pool_config = TableSessionPoolConfig( + node_urls=[str(config.host) + ":" + str(config.port)], + username=config.user, + password=config.password, + max_pool_size=max_pool_size, # Increased from 5 for better concurrency + database=None if len(config.database) == 0 else config.database, + ) + session_pool = TableSessionPool(session_pool_config) + + @mcp.tool() + async def read_query(query_sql: str) -> list[TextContent]: + """Execute a SELECT query on the IoTDB. Please use table sql_dialect when generating SQL queries. + + Args: + query_sql: The SQL query to execute (using TABLE dialect) + """ + table_session = None + try: + table_session = session_pool.get_session() + stmt = query_sql.strip().upper() + + # Regular SELECT queries + if ( + stmt.startswith("SELECT") + or stmt.startswith("DESCRIBE") + or stmt.startswith("SHOW") + ): + res = table_session.execute_query_statement(query_sql) + return prepare_res(res, table_session) + else: + table_session.close() + raise ValueError("Only SELECT queries are allowed for read_query") + except Exception as e: + if table_session: + table_session.close() + logger.error(f"Failed to execute query: {str(e)}") + raise + + @mcp.tool() + async def list_tables() -> list[TextContent]: + """List all tables in the IoTDB database.""" + table_session = None + try: + table_session = session_pool.get_session() + res = table_session.execute_query_statement("SHOW TABLES") + + result = ["Tables_in_" + db_config["database"]] # Header + while res.has_next(): + result.append(str(res.next().get_fields()[0])) + table_session.close() + return [TextContent(type="text", text="\n".join(result))] + except Exception as e: + if table_session: + table_session.close() + logger.error(f"Failed to list tables: {str(e)}") + raise + + @mcp.tool() + async def describe_table(table_name: str) -> list[TextContent]: + """Get the schema information for a specific table + Args: + table_name: name of the table to describe + """ + table_session = None + try: + table_session = session_pool.get_session() + res = table_session.execute_query_statement("DESC " + table_name + " details") + return prepare_res(res, table_session) + except Exception as e: + if table_session: + table_session.close() + logger.error(f"Failed to describe table {table_name}: {str(e)}") + raise + + @mcp.tool() + async def export_table_query(query_sql: str, format: str = "csv") -> list[TextContent]: + """Execute a query and export the results to a CSV or Excel file. + + Args: + query_sql: The SQL query to execute (using TABLE dialect) + format: Export format, either "csv" or "excel" (default: "csv") + + SQL Syntax: + SELECT ⟨select_list⟩ + FROM ⟨tables⟩ + [WHERE ⟨condition⟩] + [GROUP BY ⟨groups⟩] + [HAVING ⟨group_filter⟩] + [FILL ⟨fill_methods⟩] + [ORDER BY ⟨order_expression⟩] + [OFFSET ⟨n⟩] + [LIMIT ⟨n⟩]; + + Returns: + Information about the exported file and a preview of the data (max 10 rows) + """ + table_session = None + try: + table_session = session_pool.get_session() + stmt = query_sql.strip().upper() + + if stmt.startswith("SELECT") or stmt.startswith("SHOW") or stmt.startswith("DESCRIBE") or stmt.startswith("DESC"): + # Execute the query + res = table_session.execute_query_statement(query_sql) + + # Get column names + columns = res.get_column_names() + + # Collect all rows + rows = [] + while res.has_next(): + row = res.next().get_fields() + rows.append(row) + + # Close the session + table_session.close() + + # Create a pandas DataFrame + df = pd.DataFrame(rows, columns=columns) + + # Generate unique filename with timestamp + timestamp = int(datetime.datetime.now().timestamp()) + filename = f"dump_{uuid.uuid4().hex[:4]}_{timestamp}" + filepath = "" + + if format.lower() == "csv": + filepath = f"{config.export_path}/{filename}.csv" + df.to_csv(filepath, index=False) + elif format.lower() == "excel": + filepath = f"{config.export_path}/{filename}.xlsx" + df.to_excel(filepath, index=False) + else: + raise ValueError("Format must be either 'csv' or 'excel'") + + # Generate preview (first 10 rows) + preview_rows = min(10, len(rows)) + preview_data = [] + preview_data.append(",".join(columns)) # Header + + for i in range(preview_rows): + preview_data.append(",".join(map(str, rows[i]))) + + # Return information + return [ + TextContent( + type="text", + text=f"Query results exported to {filepath}\n\nPreview (first {preview_rows} rows):\n" + "\n".join(preview_data) + ) + ] + else: + raise ValueError("Only SELECT, SHOW or DESCRIBE queries are allowed for export") + except Exception as e: + if table_session: + table_session.close() + logger.error(f"Failed to export table query: {str(e)}") + raise + + def prepare_res( + _res: SessionDataSet, _table_session: TableSession + ) -> list[TextContent]: + columns = _res.get_column_names() + result = [] + while _res.has_next(): + row = _res.next().get_fields() + result.append(",".join(map(str, row))) + _table_session.close() + return [ + TextContent( + type="text", + text="\n".join([",".join(columns)] + result), + ) + ] + + +def main(): + logger.info("iotdb_mcp_server running with stdio transport") + # Initialize and run the server + mcp.run(transport="stdio") + + +if __name__ == "__main__": + main()
