This is an automated email from the ASF dual-hosted git repository.
xtsong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-agents.git
The following commit(s) were added to refs/heads/main by this push:
new a6114c90 [api][plan] Use resource descriptor to declare a MCP server
in Agent. (#455)
a6114c90 is described below
commit a6114c90cb695515ef3f5fc4d78be9fbf54198b8
Author: Wenjin Xie <[email protected]>
AuthorDate: Mon Jan 19 18:12:40 2026 +0800
[api][plan] Use resource descriptor to declare a MCP server in Agent. (#455)
---
.../apache/flink/agents/api/resource/Constant.java | 3 ++
.../flink/agents/integrations/mcp/MCPServer.java | 18 +++++++++--
.../org/apache/flink/agents/plan/AgentPlan.java | 9 +++---
.../agents/plan/AgentPlanDeclareMCPServerTest.java | 14 +++++----
python/flink_agents/api/resource.py | 3 ++
.../e2e_tests_mcp/mcp_test.py | 5 ++--
python/flink_agents/integrations/mcp/mcp.py | 8 ++---
python/flink_agents/plan/agent_plan.py | 35 +++++++++++++---------
8 files changed, 63 insertions(+), 32 deletions(-)
diff --git
a/api/src/main/java/org/apache/flink/agents/api/resource/Constant.java
b/api/src/main/java/org/apache/flink/agents/api/resource/Constant.java
index bda3f200..32bc9e5d 100644
--- a/api/src/main/java/org/apache/flink/agents/api/resource/Constant.java
+++ b/api/src/main/java/org/apache/flink/agents/api/resource/Constant.java
@@ -79,4 +79,7 @@ public class Constant {
// elasticsearch
public static String ELASTICSEARCH_VECTOR_STORE =
"org.apache.flink.agents.integrations.vectorstores.elasticsearch.ElasticsearchVectorStore";
+
+ // MCP
+ public static String MCP_SERVER =
"org.apache.flink.agents.integrations.mcp.MCPServer";
}
diff --git
a/integrations/mcp/src/main/java/org/apache/flink/agents/integrations/mcp/MCPServer.java
b/integrations/mcp/src/main/java/org/apache/flink/agents/integrations/mcp/MCPServer.java
index 0ba161a2..64e71a32 100644
---
a/integrations/mcp/src/main/java/org/apache/flink/agents/integrations/mcp/MCPServer.java
+++
b/integrations/mcp/src/main/java/org/apache/flink/agents/integrations/mcp/MCPServer.java
@@ -28,8 +28,9 @@ import
io.modelcontextprotocol.client.transport.HttpClientStreamableHttpTranspor
import io.modelcontextprotocol.spec.McpSchema;
import org.apache.flink.agents.api.chat.messages.ChatMessage;
import org.apache.flink.agents.api.chat.messages.MessageRole;
+import org.apache.flink.agents.api.resource.Resource;
+import org.apache.flink.agents.api.resource.ResourceDescriptor;
import org.apache.flink.agents.api.resource.ResourceType;
-import org.apache.flink.agents.api.resource.SerializableResource;
import org.apache.flink.agents.api.tools.ToolMetadata;
import org.apache.flink.agents.integrations.mcp.auth.ApiKeyAuth;
import org.apache.flink.agents.integrations.mcp.auth.Auth;
@@ -44,6 +45,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.function.BiFunction;
/**
* Resource representing an MCP server and exposing its tools/prompts.
@@ -73,7 +75,7 @@ import java.util.Objects;
*
* <p>Reference: <a
href="https://modelcontextprotocol.io/sdk/java/mcp-client">MCP Java Client</a>
*/
-public class MCPServer extends SerializableResource {
+public class MCPServer extends Resource {
private static final String FIELD_ENDPOINT = "endpoint";
private static final String FIELD_HEADERS = "headers";
@@ -131,6 +133,18 @@ public class MCPServer extends SerializableResource {
}
}
+ public MCPServer(
+ ResourceDescriptor descriptor, BiFunction<String, ResourceType,
Resource> getResource) {
+ super(descriptor, getResource);
+ this.endpoint =
+ Objects.requireNonNull(
+ descriptor.getArgument("endpoint"), "endpoint cannot
be null");
+ Map<String, String> headers = descriptor.getArgument("headers");
+ this.headers = headers != null ? new HashMap<>(headers) : new
HashMap<>();
+ this.timeoutSeconds = (int) descriptor.getArgument("timeout");
+ this.auth = descriptor.getArgument("auth");
+ }
+
/**
* Creates a new MCPServer instance.
*
diff --git a/plan/src/main/java/org/apache/flink/agents/plan/AgentPlan.java
b/plan/src/main/java/org/apache/flink/agents/plan/AgentPlan.java
index 6526f5a4..9fa05a31 100644
--- a/plan/src/main/java/org/apache/flink/agents/plan/AgentPlan.java
+++ b/plan/src/main/java/org/apache/flink/agents/plan/AgentPlan.java
@@ -332,11 +332,12 @@ public class AgentPlan implements Serializable {
private void extractMCPServer(Method method) throws Exception {
// Use reflection to handle MCP classes to support Java 11 without MCP
String name = method.getName();
- Object mcpServer = method.invoke(null);
- addResourceProvider(
- JavaSerializableResourceProvider.createResourceProvider(
- name, MCP_SERVER, (SerializableResource) mcpServer));
+ ResourceDescriptor descriptor = (ResourceDescriptor)
method.invoke(null);
+ JavaResourceProvider provider = new JavaResourceProvider(name,
MCP_SERVER, descriptor);
+
+ addResourceProvider(provider);
+ Object mcpServer = provider.provide(null);
// Call listTools() via reflection
Method listToolsMethod = mcpServer.getClass().getMethod("listTools");
diff --git
a/plan/src/test/java/org/apache/flink/agents/plan/AgentPlanDeclareMCPServerTest.java
b/plan/src/test/java/org/apache/flink/agents/plan/AgentPlanDeclareMCPServerTest.java
index 0be116f6..760eae18 100644
---
a/plan/src/test/java/org/apache/flink/agents/plan/AgentPlanDeclareMCPServerTest.java
+++
b/plan/src/test/java/org/apache/flink/agents/plan/AgentPlanDeclareMCPServerTest.java
@@ -25,11 +25,11 @@ import org.apache.flink.agents.api.agents.Agent;
import org.apache.flink.agents.api.annotation.Action;
import org.apache.flink.agents.api.context.RunnerContext;
import org.apache.flink.agents.api.prompt.Prompt;
+import org.apache.flink.agents.api.resource.ResourceDescriptor;
import org.apache.flink.agents.api.resource.ResourceType;
import org.apache.flink.agents.api.tools.Tool;
import org.apache.flink.agents.api.tools.ToolMetadata;
import org.apache.flink.agents.integrations.mcp.MCPPrompt;
-import org.apache.flink.agents.integrations.mcp.MCPServer;
import org.apache.flink.agents.integrations.mcp.MCPTool;
import org.apache.flink.agents.plan.resourceprovider.ResourceProvider;
import org.junit.jupiter.api.*;
@@ -39,10 +39,10 @@ import org.junit.jupiter.api.condition.JRE;
import java.io.File;
import java.net.HttpURLConnection;
import java.net.URL;
-import java.time.Duration;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import static org.apache.flink.agents.api.resource.Constant.MCP_SERVER;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
@@ -62,7 +62,7 @@ class AgentPlanDeclareMCPServerTest {
private static Process pythonMcpServerProcess;
private static final String MCP_SERVER_SCRIPT =
- "python/flink_agents/api/tests/mcp/mcp_server.py";
+ "python/flink_agents/integrations/mcp/tests/mcp_server.py";
private static final String MCP_ENDPOINT = "http://127.0.0.1:8000/mcp";
private AgentPlan agentPlan;
@@ -71,8 +71,11 @@ class AgentPlanDeclareMCPServerTest {
static class TestMCPAgent extends Agent {
@org.apache.flink.agents.api.annotation.MCPServer
- public static MCPServer testMcpServer() {
- return
MCPServer.builder(MCP_ENDPOINT).timeout(Duration.ofSeconds(30)).build();
+ public static ResourceDescriptor testMcpServer() {
+ return ResourceDescriptor.Builder.newBuilder(MCP_SERVER)
+ .addInitialArgument("endpoint", MCP_ENDPOINT)
+ .addInitialArgument("timeout", 30)
+ .build();
}
@Action(listenEvents = {InputEvent.class})
@@ -162,6 +165,7 @@ class AgentPlanDeclareMCPServerTest {
private static boolean isServerReady(String endpoint) {
try {
URL url = new URL(endpoint);
+ // noinspection StartSSRFNetHookCheckingInspection
HttpURLConnection connection = (HttpURLConnection)
url.openConnection();
connection.setRequestMethod("GET");
connection.setConnectTimeout(1000);
diff --git a/python/flink_agents/api/resource.py
b/python/flink_agents/api/resource.py
index f7a83e46..fa0e5b4b 100644
--- a/python/flink_agents/api/resource.py
+++ b/python/flink_agents/api/resource.py
@@ -260,3 +260,6 @@ class Constant:
JAVA_COLLECTION_MANAGEABLE_VECTOR_STORE =
"flink_agents.api.vector_stores.java_vector_store.JavaCollectionManageableVectorStore"
# chroma
CHROMA_VECTOR_STORE =
"flink_agents.integrations.vector_stores.chroma.chroma_vector_store.ChromaVectorStore"
+
+ # MCP
+ MCP_SERVER = "flink_agents.integrations.mcp.mcp.MCPServer"
diff --git
a/python/flink_agents/e2e_tests/e2e_tests_integration/e2e_tests_mcp/mcp_test.py
b/python/flink_agents/e2e_tests/e2e_tests_integration/e2e_tests_mcp/mcp_test.py
index 7c5e7448..37da6e26 100644
---
a/python/flink_agents/e2e_tests/e2e_tests_integration/e2e_tests_mcp/mcp_test.py
+++
b/python/flink_agents/e2e_tests/e2e_tests_integration/e2e_tests_mcp/mcp_test.py
@@ -52,7 +52,6 @@ from flink_agents.api.resource import (
)
from flink_agents.api.runner_context import RunnerContext
from flink_agents.e2e_tests.test_utils import pull_model
-from flink_agents.integrations.mcp.mcp import MCPServer
OLLAMA_MODEL = os.environ.get("MCP_OLLAMA_CHAT_MODEL", "qwen3:1.7b")
MCP_SERVER_ENDPOINT = "http://127.0.0.1:8000/mcp"
@@ -70,9 +69,9 @@ class MyMCPAgent(Agent):
@mcp_server
@staticmethod
- def my_mcp_server() -> MCPServer:
+ def my_mcp_server() -> ResourceDescriptor:
"""Define MCP server connection."""
- return MCPServer(endpoint=MCP_SERVER_ENDPOINT)
+ return ResourceDescriptor(clazz=Constant.MCP_SERVER,
endpoint=MCP_SERVER_ENDPOINT)
@chat_model_connection
@staticmethod
diff --git a/python/flink_agents/integrations/mcp/mcp.py
b/python/flink_agents/integrations/mcp/mcp.py
index f430cbe0..79cf26f4 100644
--- a/python/flink_agents/integrations/mcp/mcp.py
+++ b/python/flink_agents/integrations/mcp/mcp.py
@@ -38,7 +38,7 @@ from typing_extensions import override
from flink_agents.api.chat_message import ChatMessage, MessageRole
from flink_agents.api.prompts.prompt import Prompt
-from flink_agents.api.resource import ResourceType, SerializableResource
+from flink_agents.api.resource import Resource, ResourceType
from flink_agents.api.tools.tool import Tool, ToolMetadata, ToolType
from flink_agents.integrations.mcp.utils import extract_mcp_content_item
@@ -49,7 +49,7 @@ class MCPTool(Tool):
This represents a single tool from an MCP server.
"""
- mcp_server: "MCPServer" = Field(default=None, exclude=True)
+ mcp_server: "MCPServer" = Field(default=None)
@classmethod
@override
@@ -86,7 +86,7 @@ class MCPPrompt(Prompt):
title: str | None
description: str | None = None
prompt_arguments: list[PromptArgument] = Field(default_factory=list)
- mcp_server: "MCPServer" = Field(default=None, exclude=True)
+ mcp_server: "MCPServer" = Field(default=None)
def _check_arguments(self, **kwargs: str) -> Dict[str, str]:
if self.mcp_server is None:
@@ -133,7 +133,7 @@ class MCPPrompt(Prompt):
finally:
self.mcp_server = None
-class MCPServer(SerializableResource, ABC):
+class MCPServer(Resource, ABC):
"""Resource representing an MCP server and exposing its tools/prompts.
This is a logical container for MCP tools and prompts; it is not directly
invokable.
diff --git a/python/flink_agents/plan/agent_plan.py
b/python/flink_agents/plan/agent_plan.py
index b9180ca1..daf66128 100644
--- a/python/flink_agents/plan/agent_plan.py
+++ b/python/flink_agents/plan/agent_plan.py
@@ -15,13 +15,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#################################################################################
-from typing import Any, Dict, List, cast
+from typing import TYPE_CHECKING, Any, Dict, List, cast
from pydantic import BaseModel, field_serializer, model_validator
from flink_agents.api.agents.agent import Agent
-from flink_agents.api.resource import Resource, ResourceType
-from flink_agents.integrations.mcp.mcp import MCPServer
+from flink_agents.api.resource import Resource, ResourceDescriptor,
ResourceType
from flink_agents.plan.actions.action import Action
from flink_agents.plan.actions.chat_model_action import CHAT_MODEL_ACTION
from flink_agents.plan.actions.context_retrieval_action import
CONTEXT_RETRIEVAL_ACTION
@@ -37,6 +36,9 @@ from flink_agents.plan.resource_provider import (
)
from flink_agents.plan.tools.function_tool import from_callable
+if TYPE_CHECKING:
+ from flink_agents.integrations.mcp.mcp import MCPServer
+
BUILT_IN_ACTIONS = [CHAT_MODEL_ACTION, TOOL_CALL_ACTION,
CONTEXT_RETRIEVAL_ACTION]
@@ -134,7 +136,7 @@ class AgentPlan(BaseModel):
actions_by_event[event_type].append(action.name)
resource_providers = {}
- for provider in _get_resource_providers(agent):
+ for provider in _get_resource_providers(agent, config):
type = provider.type
if type not in resource_providers:
resource_providers[type] = {}
@@ -284,7 +286,7 @@ def _get_actions(agent: Agent) -> List[Action]:
return actions
-def _get_resource_providers(agent: Agent) -> List[ResourceProvider]:
+def _get_resource_providers(agent: Agent, config: AgentConfiguration) ->
List[ResourceProvider]:
resource_providers = []
# retrieve resource declared by decorator
for name, value in agent.__class__.__dict__.items():
@@ -334,8 +336,8 @@ def _get_resource_providers(agent: Agent) ->
List[ResourceProvider]:
if isinstance(value, staticmethod):
value = value.__func__
- mcp_server = value()
- _add_mcp_server(name, resource_providers, mcp_server)
+ descriptor = value()
+ _add_mcp_server(name, resource_providers, descriptor, config)
# retrieve resource declared by add interface
for name, prompt in agent.resources[ResourceType.PROMPT].items():
@@ -350,9 +352,8 @@ def _get_resource_providers(agent: Agent) ->
List[ResourceProvider]:
)
)
- for name, mcp_server in agent.resources[ResourceType.MCP_SERVER].items():
- mcp_server = cast("MCPServer", mcp_server)
- _add_mcp_server(name, resource_providers, mcp_server)
+ for name, descriptor in agent.resources[ResourceType.MCP_SERVER].items():
+ _add_mcp_server(name, resource_providers, descriptor)
for resource_type in [
ResourceType.CHAT_MODEL,
@@ -375,11 +376,17 @@ def _get_resource_providers(agent: Agent) ->
List[ResourceProvider]:
def _add_mcp_server(
- name: str, resource_providers: List[ResourceProvider], mcp_server:
MCPServer
+ name: str, resource_providers: List[ResourceProvider], descriptor:
ResourceDescriptor, config: AgentConfiguration
) -> None:
- resource_providers.append(
- PythonSerializableResourceProvider.from_resource(name=name,
resource=mcp_server)
- )
+ provider = PythonResourceProvider.get(name=name, descriptor=descriptor)
+
+ resource_providers.append(provider)
+
+ def get_resource(name: str, descriptor: ResourceDescriptor) -> Any:
+ """Placeholder."""
+
+ mcp_server = cast("MCPServer", provider.provide(get_resource=get_resource,
config=config))
+
resource_providers.extend(
[
PythonSerializableResourceProvider.from_resource(