This is an automated email from the ASF dual-hosted git repository. xtsong pushed a commit to branch release-0.2 in repository https://gitbox.apache.org/repos/asf/flink-agents.git
commit b4e90edce56cfb5b7025860d15f5048dc53d6c8e Author: JennyChen <[email protected]> AuthorDate: Wed Feb 11 18:11:08 2026 +0800 [integration] Support MCP Servers that do not support prompts. (#447) Co-authored-by: jennychen <[email protected]> --- python/flink_agents/api/tools/tool.py | 3 +- .../e2e_tests_mcp/mcp_server.py | 2 +- ...mcp_server.py => mcp_server_without_prompts.py} | 19 +---- .../e2e_tests_mcp/mcp_test.py | 93 +++++++++++++++------- python/flink_agents/integrations/mcp/mcp.py | 8 +- 5 files changed, 76 insertions(+), 49 deletions(-) diff --git a/python/flink_agents/api/tools/tool.py b/python/flink_agents/api/tools/tool.py index c42897fc..585e64f5 100644 --- a/python/flink_agents/api/tools/tool.py +++ b/python/flink_agents/api/tools/tool.py @@ -76,8 +76,9 @@ class ToolMetadata(BaseModel): def __custom_deserialize(self) -> "ToolMetadata": args_schema = self["args_schema"] if isinstance(args_schema, dict): + title = args_schema.get("title", "default") self["args_schema"] = create_model_from_schema( - args_schema["title"], args_schema + title, args_schema ) return self diff --git a/python/flink_agents/e2e_tests/e2e_tests_integration/e2e_tests_mcp/mcp_server.py b/python/flink_agents/e2e_tests/e2e_tests_integration/e2e_tests_mcp/mcp_server.py index 7bfbc0a3..7277e78d 100644 --- a/python/flink_agents/e2e_tests/e2e_tests_integration/e2e_tests_mcp/mcp_server.py +++ b/python/flink_agents/e2e_tests/e2e_tests_integration/e2e_tests_mcp/mcp_server.py @@ -39,7 +39,7 @@ def ask_sum(a: int, b: int) -> str: Returns: A formatted prompt string """ - return f"Calculate the sum of {a} and {b}?" + return f"Calculate the sum of {a} and {b} by using the add tool" @mcp.tool() diff --git a/python/flink_agents/e2e_tests/e2e_tests_integration/e2e_tests_mcp/mcp_server.py b/python/flink_agents/e2e_tests/e2e_tests_integration/e2e_tests_mcp/mcp_server_without_prompts.py similarity index 78% copy from python/flink_agents/e2e_tests/e2e_tests_integration/e2e_tests_mcp/mcp_server.py copy to python/flink_agents/e2e_tests/e2e_tests_integration/e2e_tests_mcp/mcp_server_without_prompts.py index 7bfbc0a3..456a5715 100644 --- a/python/flink_agents/e2e_tests/e2e_tests_integration/e2e_tests_mcp/mcp_server.py +++ b/python/flink_agents/e2e_tests/e2e_tests_integration/e2e_tests_mcp/mcp_server_without_prompts.py @@ -23,24 +23,7 @@ from mcp.server.fastmcp import FastMCP dotenv.load_dotenv() # Create MCP server -mcp = FastMCP("MathServer") - - [email protected]() -def ask_sum(a: int, b: int) -> str: - """Generate a prompt asking to calculate the sum of two numbers. - - This prompt will be used by chat models to request calculations. - - Args: - a: The first operand - b: The second operand - - Returns: - A formatted prompt string - """ - return f"Calculate the sum of {a} and {b}?" - +mcp = FastMCP("MathServer", port = 8001) @mcp.tool() async def add(a: int, b: int) -> int: diff --git a/python/flink_agents/e2e_tests/e2e_tests_integration/e2e_tests_mcp/mcp_test.py b/python/flink_agents/e2e_tests/e2e_tests_integration/e2e_tests_mcp/mcp_test.py index a5f5efc4..db2163fe 100644 --- a/python/flink_agents/e2e_tests/e2e_tests_integration/e2e_tests_mcp/mcp_test.py +++ b/python/flink_agents/e2e_tests/e2e_tests_integration/e2e_tests_mcp/mcp_test.py @@ -21,9 +21,10 @@ This example shows how to: 1. Define an MCP server connection 2. Use MCP prompts in chat model setups 3. Use MCP tools in actions +4. Use MCP tools without prompts Prerequisites: -- Run the MCP server first: mcp_server.py +- Run the MCP server first: mcp_server.py or mcp_server_without_prompts.py """ import multiprocessing @@ -55,6 +56,7 @@ from flink_agents.e2e_tests.test_utils import pull_model OLLAMA_MODEL = os.environ.get("MCP_OLLAMA_CHAT_MODEL", "qwen3:1.7b") MCP_SERVER_ENDPOINT = "http://127.0.0.1:8000/mcp" +MCP_SERVER_ENDPOINT_WITHOUT_PROMPTS = "http://127.0.0.1:8001/mcp" class CalculationInput(BaseModel): @@ -70,8 +72,13 @@ class MyMCPAgent(Agent): @mcp_server @staticmethod def my_mcp_server() -> ResourceDescriptor: - """Define MCP server connection.""" - return ResourceDescriptor(clazz=ResourceName.MCP_SERVER, endpoint=MCP_SERVER_ENDPOINT) + """Define MCP server connection based on MCP_SERVER_MODE env variable.""" + mcp_mode = os.environ.get("MCP_SERVER_MODE", "with_prompts") + if mcp_mode == "without_prompts": + endpoint = MCP_SERVER_ENDPOINT_WITHOUT_PROMPTS + else: + endpoint = MCP_SERVER_ENDPOINT + return ResourceDescriptor(clazz=ResourceName.MCP_SERVER, endpoint=endpoint) @chat_model_connection @staticmethod @@ -84,31 +91,43 @@ class MyMCPAgent(Agent): @chat_model_setup @staticmethod def math_chat_model() -> ResourceDescriptor: - """ChatModel using MCP prompt and tool.""" - return ResourceDescriptor( - clazz=ResourceName.ChatModel.OLLAMA_SETUP, - connection="ollama_connection", - model=OLLAMA_MODEL, - prompt="ask_sum", # MCP prompt registered from my_mcp_server - tools=["add"], # MCP tool registered from my_mcp_server - extract_reasoning=True, - ) + """ChatModel using MCP prompt and tool (or just tool if without prompts).""" + mcp_mode = os.environ.get("MCP_SERVER_MODE", "with_prompts") + descriptor_kwargs = { + "clazz": ResourceName.ChatModel.OLLAMA_SETUP, + "connection": "ollama_connection", + "model": OLLAMA_MODEL, + "tools": ["add"], # MCP tool registered from my_mcp_server + } + # Only add prompt if using server with prompts + if mcp_mode == "with_prompts": + descriptor_kwargs["prompt"] = "ask_sum" # MCP prompt registered from my_mcp_server + return ResourceDescriptor(**descriptor_kwargs) @action(InputEvent) @staticmethod def process_input(event: InputEvent, ctx: RunnerContext) -> None: - """Process input and send chat request using MCP prompt. + """Process input and send chat request. - The MCP prompt "ask_sum" accepts parameters {a} and {b}. + Uses MCP prompt if MCP_SERVER_MODE is "with_prompts", + otherwise sends direct content message. """ input_data: CalculationInput = event.input - - # Send chat request with MCP prompt variables - # The prompt template will be filled with a and b values - msg = ChatMessage( - role=MessageRole.USER, - extra_args={"a": str(input_data.a), "b": str(input_data.b)}, - ) + mcp_mode = os.environ.get("MCP_SERVER_MODE", "with_prompts") + + if mcp_mode == "with_prompts": + # Send chat request with MCP prompt variables + # The prompt template will be filled with a and b values + msg = ChatMessage( + role=MessageRole.USER, + extra_args={"a": str(input_data.a), "b": str(input_data.b)}, + ) + else: + # Send chat request asking to use the add tool + msg = ChatMessage( + role=MessageRole.USER, + content=f"Please use the add tool to calculate the sum of {input_data.a} and {input_data.b}.", + ) ctx.send_event(ChatRequestEvent(model="math_chat_model", messages=[msg])) @@ -121,9 +140,9 @@ class MyMCPAgent(Agent): ctx.send_event(OutputEvent(output=response.content)) -def run_mcp_server() -> None: +def run_mcp_server(server_file: str) -> None: """Run the MCP server in a separate process.""" - runpy.run_path(f"{current_dir}/mcp_server.py") + runpy.run_path(f"{current_dir}/{server_file}") current_dir = Path(__file__).parent @@ -131,18 +150,36 @@ current_dir = Path(__file__).parent client = pull_model(OLLAMA_MODEL) [email protected]( + ("mcp_server_mode", "server_file", "server_endpoint"), + [ + ("with_prompts", "mcp_server.py", MCP_SERVER_ENDPOINT), + ("without_prompts", "mcp_server_without_prompts.py", MCP_SERVER_ENDPOINT_WITHOUT_PROMPTS), + ], +) @pytest.mark.skipif( client is None, reason="Ollama client is not available or test model is missing" ) -def test_mcp() -> None: # noqa:D103 +def test_mcp(mcp_server_mode: str, server_file: str, server_endpoint: str) -> None: + """Test MCP integration with different server modes. + + Args: + mcp_server_mode: "with_prompts" or "without_prompts" + server_file: Name of the MCP server file to run + server_endpoint: Endpoint URL of the MCP server + """ # Start MCP server in background - print("Starting MCP server...") - server_process = multiprocessing.Process(target=run_mcp_server) + print(f"Starting MCP server: {server_file}...") + server_process = multiprocessing.Process(target=run_mcp_server, args=(server_file,)) server_process.start() time.sleep(5) + # Set environment variable to control agent behavior + os.environ["MCP_SERVER_MODE"] = mcp_server_mode + print(f"\nRunning MyMCPAgent with Ollama model: {OLLAMA_MODEL}") - print(f"MCP server endpoint: {MCP_SERVER_ENDPOINT}\n") + print(f"MCP server mode: {mcp_server_mode}") + print(f"MCP server endpoint: {server_endpoint}\n") env = AgentsExecutionEnvironment.get_execution_environment() input_list = [] @@ -160,5 +197,5 @@ def test_mcp() -> None: # noqa:D103 for output in output_list: for key, value in output.items(): print(f"{key}: {value}") - + assert len(output_list) == 2 server_process.kill() diff --git a/python/flink_agents/integrations/mcp/mcp.py b/python/flink_agents/integrations/mcp/mcp.py index a15c4b3d..08066673 100644 --- a/python/flink_agents/integrations/mcp/mcp.py +++ b/python/flink_agents/integrations/mcp/mcp.py @@ -27,6 +27,7 @@ import cloudpickle import httpx from mcp.client.session import ClientSession from mcp.client.streamable_http import streamablehttp_client +from mcp.shared.exceptions import McpError from mcp.types import PromptArgument, TextContent from pydantic import ( ConfigDict, @@ -271,7 +272,12 @@ class MCPServer(Resource, ABC): def list_prompts(self) -> List[MCPPrompt]: """List available prompts from the MCP server.""" - return asyncio.run(self._list_prompts_async()) + try: + return asyncio.run(self._list_prompts_async()) + except McpError as e: + if "prompts not supported" in str(e).lower(): + return [] + raise async def _list_prompts_async(self) -> List[MCPPrompt]: """Async implementation of list_prompts."""
