This is an automated email from the ASF dual-hosted git repository.
xtsong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-agents.git
The following commit(s) were added to refs/heads/main by this push:
new d620162d [integration] Support MCP Servers that do not support
prompts. (#447)
d620162d is described below
commit d620162df93cd85d1f48dc467f8eeb7fa2d5dd34
Author: JennyChen <[email protected]>
AuthorDate: Wed Feb 11 18:11:08 2026 +0800
[integration] Support MCP Servers that do not support prompts. (#447)
Co-authored-by: jennychen <[email protected]>
---
python/flink_agents/api/tools/tool.py | 3 +-
.../e2e_tests_mcp/mcp_server.py | 2 +-
...mcp_server.py => mcp_server_without_prompts.py} | 19 +----
.../e2e_tests_mcp/mcp_test.py | 93 +++++++++++++++-------
python/flink_agents/integrations/mcp/mcp.py | 8 +-
5 files changed, 76 insertions(+), 49 deletions(-)
diff --git a/python/flink_agents/api/tools/tool.py
b/python/flink_agents/api/tools/tool.py
index c42897fc..585e64f5 100644
--- a/python/flink_agents/api/tools/tool.py
+++ b/python/flink_agents/api/tools/tool.py
@@ -76,8 +76,9 @@ class ToolMetadata(BaseModel):
def __custom_deserialize(self) -> "ToolMetadata":
args_schema = self["args_schema"]
if isinstance(args_schema, dict):
+ title = args_schema.get("title", "default")
self["args_schema"] = create_model_from_schema(
- args_schema["title"], args_schema
+ title, args_schema
)
return self
diff --git
a/python/flink_agents/e2e_tests/e2e_tests_integration/e2e_tests_mcp/mcp_server.py
b/python/flink_agents/e2e_tests/e2e_tests_integration/e2e_tests_mcp/mcp_server.py
index 7bfbc0a3..7277e78d 100644
---
a/python/flink_agents/e2e_tests/e2e_tests_integration/e2e_tests_mcp/mcp_server.py
+++
b/python/flink_agents/e2e_tests/e2e_tests_integration/e2e_tests_mcp/mcp_server.py
@@ -39,7 +39,7 @@ def ask_sum(a: int, b: int) -> str:
Returns:
A formatted prompt string
"""
- return f"Calculate the sum of {a} and {b}?"
+ return f"Calculate the sum of {a} and {b} by using the add tool"
@mcp.tool()
diff --git
a/python/flink_agents/e2e_tests/e2e_tests_integration/e2e_tests_mcp/mcp_server.py
b/python/flink_agents/e2e_tests/e2e_tests_integration/e2e_tests_mcp/mcp_server_without_prompts.py
similarity index 78%
copy from
python/flink_agents/e2e_tests/e2e_tests_integration/e2e_tests_mcp/mcp_server.py
copy to
python/flink_agents/e2e_tests/e2e_tests_integration/e2e_tests_mcp/mcp_server_without_prompts.py
index 7bfbc0a3..456a5715 100644
---
a/python/flink_agents/e2e_tests/e2e_tests_integration/e2e_tests_mcp/mcp_server.py
+++
b/python/flink_agents/e2e_tests/e2e_tests_integration/e2e_tests_mcp/mcp_server_without_prompts.py
@@ -23,24 +23,7 @@ from mcp.server.fastmcp import FastMCP
dotenv.load_dotenv()
# Create MCP server
-mcp = FastMCP("MathServer")
-
-
[email protected]()
-def ask_sum(a: int, b: int) -> str:
- """Generate a prompt asking to calculate the sum of two numbers.
-
- This prompt will be used by chat models to request calculations.
-
- Args:
- a: The first operand
- b: The second operand
-
- Returns:
- A formatted prompt string
- """
- return f"Calculate the sum of {a} and {b}?"
-
+mcp = FastMCP("MathServer", port = 8001)
@mcp.tool()
async def add(a: int, b: int) -> int:
diff --git
a/python/flink_agents/e2e_tests/e2e_tests_integration/e2e_tests_mcp/mcp_test.py
b/python/flink_agents/e2e_tests/e2e_tests_integration/e2e_tests_mcp/mcp_test.py
index a5f5efc4..db2163fe 100644
---
a/python/flink_agents/e2e_tests/e2e_tests_integration/e2e_tests_mcp/mcp_test.py
+++
b/python/flink_agents/e2e_tests/e2e_tests_integration/e2e_tests_mcp/mcp_test.py
@@ -21,9 +21,10 @@ This example shows how to:
1. Define an MCP server connection
2. Use MCP prompts in chat model setups
3. Use MCP tools in actions
+4. Use MCP tools without prompts
Prerequisites:
-- Run the MCP server first: mcp_server.py
+- Run the MCP server first: mcp_server.py or mcp_server_without_prompts.py
"""
import multiprocessing
@@ -55,6 +56,7 @@ from flink_agents.e2e_tests.test_utils import pull_model
OLLAMA_MODEL = os.environ.get("MCP_OLLAMA_CHAT_MODEL", "qwen3:1.7b")
MCP_SERVER_ENDPOINT = "http://127.0.0.1:8000/mcp"
+MCP_SERVER_ENDPOINT_WITHOUT_PROMPTS = "http://127.0.0.1:8001/mcp"
class CalculationInput(BaseModel):
@@ -70,8 +72,13 @@ class MyMCPAgent(Agent):
@mcp_server
@staticmethod
def my_mcp_server() -> ResourceDescriptor:
- """Define MCP server connection."""
- return ResourceDescriptor(clazz=ResourceName.MCP_SERVER,
endpoint=MCP_SERVER_ENDPOINT)
+ """Define MCP server connection based on MCP_SERVER_MODE env
variable."""
+ mcp_mode = os.environ.get("MCP_SERVER_MODE", "with_prompts")
+ if mcp_mode == "without_prompts":
+ endpoint = MCP_SERVER_ENDPOINT_WITHOUT_PROMPTS
+ else:
+ endpoint = MCP_SERVER_ENDPOINT
+ return ResourceDescriptor(clazz=ResourceName.MCP_SERVER,
endpoint=endpoint)
@chat_model_connection
@staticmethod
@@ -84,31 +91,43 @@ class MyMCPAgent(Agent):
@chat_model_setup
@staticmethod
def math_chat_model() -> ResourceDescriptor:
- """ChatModel using MCP prompt and tool."""
- return ResourceDescriptor(
- clazz=ResourceName.ChatModel.OLLAMA_SETUP,
- connection="ollama_connection",
- model=OLLAMA_MODEL,
- prompt="ask_sum", # MCP prompt registered from my_mcp_server
- tools=["add"], # MCP tool registered from my_mcp_server
- extract_reasoning=True,
- )
+ """ChatModel using MCP prompt and tool (or just tool if without
prompts)."""
+ mcp_mode = os.environ.get("MCP_SERVER_MODE", "with_prompts")
+ descriptor_kwargs = {
+ "clazz": ResourceName.ChatModel.OLLAMA_SETUP,
+ "connection": "ollama_connection",
+ "model": OLLAMA_MODEL,
+ "tools": ["add"], # MCP tool registered from my_mcp_server
+ }
+ # Only add prompt if using server with prompts
+ if mcp_mode == "with_prompts":
+ descriptor_kwargs["prompt"] = "ask_sum" # MCP prompt registered
from my_mcp_server
+ return ResourceDescriptor(**descriptor_kwargs)
@action(InputEvent)
@staticmethod
def process_input(event: InputEvent, ctx: RunnerContext) -> None:
- """Process input and send chat request using MCP prompt.
+ """Process input and send chat request.
- The MCP prompt "ask_sum" accepts parameters {a} and {b}.
+ Uses MCP prompt if MCP_SERVER_MODE is "with_prompts",
+ otherwise sends direct content message.
"""
input_data: CalculationInput = event.input
-
- # Send chat request with MCP prompt variables
- # The prompt template will be filled with a and b values
- msg = ChatMessage(
- role=MessageRole.USER,
- extra_args={"a": str(input_data.a), "b": str(input_data.b)},
- )
+ mcp_mode = os.environ.get("MCP_SERVER_MODE", "with_prompts")
+
+ if mcp_mode == "with_prompts":
+ # Send chat request with MCP prompt variables
+ # The prompt template will be filled with a and b values
+ msg = ChatMessage(
+ role=MessageRole.USER,
+ extra_args={"a": str(input_data.a), "b": str(input_data.b)},
+ )
+ else:
+ # Send chat request asking to use the add tool
+ msg = ChatMessage(
+ role=MessageRole.USER,
+ content=f"Please use the add tool to calculate the sum of
{input_data.a} and {input_data.b}.",
+ )
ctx.send_event(ChatRequestEvent(model="math_chat_model",
messages=[msg]))
@@ -121,9 +140,9 @@ class MyMCPAgent(Agent):
ctx.send_event(OutputEvent(output=response.content))
-def run_mcp_server() -> None:
+def run_mcp_server(server_file: str) -> None:
"""Run the MCP server in a separate process."""
- runpy.run_path(f"{current_dir}/mcp_server.py")
+ runpy.run_path(f"{current_dir}/{server_file}")
current_dir = Path(__file__).parent
@@ -131,18 +150,36 @@ current_dir = Path(__file__).parent
client = pull_model(OLLAMA_MODEL)
[email protected](
+ ("mcp_server_mode", "server_file", "server_endpoint"),
+ [
+ ("with_prompts", "mcp_server.py", MCP_SERVER_ENDPOINT),
+ ("without_prompts", "mcp_server_without_prompts.py",
MCP_SERVER_ENDPOINT_WITHOUT_PROMPTS),
+ ],
+)
@pytest.mark.skipif(
client is None, reason="Ollama client is not available or test model is
missing"
)
-def test_mcp() -> None: # noqa:D103
+def test_mcp(mcp_server_mode: str, server_file: str, server_endpoint: str) ->
None:
+ """Test MCP integration with different server modes.
+
+ Args:
+ mcp_server_mode: "with_prompts" or "without_prompts"
+ server_file: Name of the MCP server file to run
+ server_endpoint: Endpoint URL of the MCP server
+ """
# Start MCP server in background
- print("Starting MCP server...")
- server_process = multiprocessing.Process(target=run_mcp_server)
+ print(f"Starting MCP server: {server_file}...")
+ server_process = multiprocessing.Process(target=run_mcp_server,
args=(server_file,))
server_process.start()
time.sleep(5)
+ # Set environment variable to control agent behavior
+ os.environ["MCP_SERVER_MODE"] = mcp_server_mode
+
print(f"\nRunning MyMCPAgent with Ollama model: {OLLAMA_MODEL}")
- print(f"MCP server endpoint: {MCP_SERVER_ENDPOINT}\n")
+ print(f"MCP server mode: {mcp_server_mode}")
+ print(f"MCP server endpoint: {server_endpoint}\n")
env = AgentsExecutionEnvironment.get_execution_environment()
input_list = []
@@ -160,5 +197,5 @@ def test_mcp() -> None: # noqa:D103
for output in output_list:
for key, value in output.items():
print(f"{key}: {value}")
-
+ assert len(output_list) == 2
server_process.kill()
diff --git a/python/flink_agents/integrations/mcp/mcp.py
b/python/flink_agents/integrations/mcp/mcp.py
index a15c4b3d..08066673 100644
--- a/python/flink_agents/integrations/mcp/mcp.py
+++ b/python/flink_agents/integrations/mcp/mcp.py
@@ -27,6 +27,7 @@ import cloudpickle
import httpx
from mcp.client.session import ClientSession
from mcp.client.streamable_http import streamablehttp_client
+from mcp.shared.exceptions import McpError
from mcp.types import PromptArgument, TextContent
from pydantic import (
ConfigDict,
@@ -271,7 +272,12 @@ class MCPServer(Resource, ABC):
def list_prompts(self) -> List[MCPPrompt]:
"""List available prompts from the MCP server."""
- return asyncio.run(self._list_prompts_async())
+ try:
+ return asyncio.run(self._list_prompts_async())
+ except McpError as e:
+ if "prompts not supported" in str(e).lower():
+ return []
+ raise
async def _list_prompts_async(self) -> List[MCPPrompt]:
"""Async implementation of list_prompts."""