This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-agents.git


The following commit(s) were added to refs/heads/main by this push:
     new a6114c90 [api][plan] Use resource descriptor to declare a MCP server 
in Agent. (#455)
a6114c90 is described below

commit a6114c90cb695515ef3f5fc4d78be9fbf54198b8
Author: Wenjin Xie <[email protected]>
AuthorDate: Mon Jan 19 18:12:40 2026 +0800

    [api][plan] Use resource descriptor to declare a MCP server in Agent. (#455)
---
 .../apache/flink/agents/api/resource/Constant.java |  3 ++
 .../flink/agents/integrations/mcp/MCPServer.java   | 18 +++++++++--
 .../org/apache/flink/agents/plan/AgentPlan.java    |  9 +++---
 .../agents/plan/AgentPlanDeclareMCPServerTest.java | 14 +++++----
 python/flink_agents/api/resource.py                |  3 ++
 .../e2e_tests_mcp/mcp_test.py                      |  5 ++--
 python/flink_agents/integrations/mcp/mcp.py        |  8 ++---
 python/flink_agents/plan/agent_plan.py             | 35 +++++++++++++---------
 8 files changed, 63 insertions(+), 32 deletions(-)

diff --git 
a/api/src/main/java/org/apache/flink/agents/api/resource/Constant.java 
b/api/src/main/java/org/apache/flink/agents/api/resource/Constant.java
index bda3f200..32bc9e5d 100644
--- a/api/src/main/java/org/apache/flink/agents/api/resource/Constant.java
+++ b/api/src/main/java/org/apache/flink/agents/api/resource/Constant.java
@@ -79,4 +79,7 @@ public class Constant {
     // elasticsearch
     public static String ELASTICSEARCH_VECTOR_STORE =
             
"org.apache.flink.agents.integrations.vectorstores.elasticsearch.ElasticsearchVectorStore";
+
+    // MCP
+    public static String MCP_SERVER = 
"org.apache.flink.agents.integrations.mcp.MCPServer";
 }
diff --git 
a/integrations/mcp/src/main/java/org/apache/flink/agents/integrations/mcp/MCPServer.java
 
b/integrations/mcp/src/main/java/org/apache/flink/agents/integrations/mcp/MCPServer.java
index 0ba161a2..64e71a32 100644
--- 
a/integrations/mcp/src/main/java/org/apache/flink/agents/integrations/mcp/MCPServer.java
+++ 
b/integrations/mcp/src/main/java/org/apache/flink/agents/integrations/mcp/MCPServer.java
@@ -28,8 +28,9 @@ import 
io.modelcontextprotocol.client.transport.HttpClientStreamableHttpTranspor
 import io.modelcontextprotocol.spec.McpSchema;
 import org.apache.flink.agents.api.chat.messages.ChatMessage;
 import org.apache.flink.agents.api.chat.messages.MessageRole;
+import org.apache.flink.agents.api.resource.Resource;
+import org.apache.flink.agents.api.resource.ResourceDescriptor;
 import org.apache.flink.agents.api.resource.ResourceType;
-import org.apache.flink.agents.api.resource.SerializableResource;
 import org.apache.flink.agents.api.tools.ToolMetadata;
 import org.apache.flink.agents.integrations.mcp.auth.ApiKeyAuth;
 import org.apache.flink.agents.integrations.mcp.auth.Auth;
@@ -44,6 +45,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.function.BiFunction;
 
 /**
  * Resource representing an MCP server and exposing its tools/prompts.
@@ -73,7 +75,7 @@ import java.util.Objects;
  *
  * <p>Reference: <a 
href="https://modelcontextprotocol.io/sdk/java/mcp-client";>MCP Java Client</a>
  */
-public class MCPServer extends SerializableResource {
+public class MCPServer extends Resource {
 
     private static final String FIELD_ENDPOINT = "endpoint";
     private static final String FIELD_HEADERS = "headers";
@@ -131,6 +133,18 @@ public class MCPServer extends SerializableResource {
         }
     }
 
+    public MCPServer(
+            ResourceDescriptor descriptor, BiFunction<String, ResourceType, 
Resource> getResource) {
+        super(descriptor, getResource);
+        this.endpoint =
+                Objects.requireNonNull(
+                        descriptor.getArgument("endpoint"), "endpoint cannot 
be null");
+        Map<String, String> headers = descriptor.getArgument("headers");
+        this.headers = headers != null ? new HashMap<>(headers) : new 
HashMap<>();
+        this.timeoutSeconds = (int) descriptor.getArgument("timeout");
+        this.auth = descriptor.getArgument("auth");
+    }
+
     /**
      * Creates a new MCPServer instance.
      *
diff --git a/plan/src/main/java/org/apache/flink/agents/plan/AgentPlan.java 
b/plan/src/main/java/org/apache/flink/agents/plan/AgentPlan.java
index 6526f5a4..9fa05a31 100644
--- a/plan/src/main/java/org/apache/flink/agents/plan/AgentPlan.java
+++ b/plan/src/main/java/org/apache/flink/agents/plan/AgentPlan.java
@@ -332,11 +332,12 @@ public class AgentPlan implements Serializable {
     private void extractMCPServer(Method method) throws Exception {
         // Use reflection to handle MCP classes to support Java 11 without MCP
         String name = method.getName();
-        Object mcpServer = method.invoke(null);
 
-        addResourceProvider(
-                JavaSerializableResourceProvider.createResourceProvider(
-                        name, MCP_SERVER, (SerializableResource) mcpServer));
+        ResourceDescriptor descriptor = (ResourceDescriptor) 
method.invoke(null);
+        JavaResourceProvider provider = new JavaResourceProvider(name, 
MCP_SERVER, descriptor);
+
+        addResourceProvider(provider);
+        Object mcpServer = provider.provide(null);
 
         // Call listTools() via reflection
         Method listToolsMethod = mcpServer.getClass().getMethod("listTools");
diff --git 
a/plan/src/test/java/org/apache/flink/agents/plan/AgentPlanDeclareMCPServerTest.java
 
b/plan/src/test/java/org/apache/flink/agents/plan/AgentPlanDeclareMCPServerTest.java
index 0be116f6..760eae18 100644
--- 
a/plan/src/test/java/org/apache/flink/agents/plan/AgentPlanDeclareMCPServerTest.java
+++ 
b/plan/src/test/java/org/apache/flink/agents/plan/AgentPlanDeclareMCPServerTest.java
@@ -25,11 +25,11 @@ import org.apache.flink.agents.api.agents.Agent;
 import org.apache.flink.agents.api.annotation.Action;
 import org.apache.flink.agents.api.context.RunnerContext;
 import org.apache.flink.agents.api.prompt.Prompt;
+import org.apache.flink.agents.api.resource.ResourceDescriptor;
 import org.apache.flink.agents.api.resource.ResourceType;
 import org.apache.flink.agents.api.tools.Tool;
 import org.apache.flink.agents.api.tools.ToolMetadata;
 import org.apache.flink.agents.integrations.mcp.MCPPrompt;
-import org.apache.flink.agents.integrations.mcp.MCPServer;
 import org.apache.flink.agents.integrations.mcp.MCPTool;
 import org.apache.flink.agents.plan.resourceprovider.ResourceProvider;
 import org.junit.jupiter.api.*;
@@ -39,10 +39,10 @@ import org.junit.jupiter.api.condition.JRE;
 import java.io.File;
 import java.net.HttpURLConnection;
 import java.net.URL;
-import java.time.Duration;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.flink.agents.api.resource.Constant.MCP_SERVER;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
@@ -62,7 +62,7 @@ class AgentPlanDeclareMCPServerTest {
 
     private static Process pythonMcpServerProcess;
     private static final String MCP_SERVER_SCRIPT =
-            "python/flink_agents/api/tests/mcp/mcp_server.py";
+            "python/flink_agents/integrations/mcp/tests/mcp_server.py";
     private static final String MCP_ENDPOINT = "http://127.0.0.1:8000/mcp";;
 
     private AgentPlan agentPlan;
@@ -71,8 +71,11 @@ class AgentPlanDeclareMCPServerTest {
     static class TestMCPAgent extends Agent {
 
         @org.apache.flink.agents.api.annotation.MCPServer
-        public static MCPServer testMcpServer() {
-            return 
MCPServer.builder(MCP_ENDPOINT).timeout(Duration.ofSeconds(30)).build();
+        public static ResourceDescriptor testMcpServer() {
+            return ResourceDescriptor.Builder.newBuilder(MCP_SERVER)
+                    .addInitialArgument("endpoint", MCP_ENDPOINT)
+                    .addInitialArgument("timeout", 30)
+                    .build();
         }
 
         @Action(listenEvents = {InputEvent.class})
@@ -162,6 +165,7 @@ class AgentPlanDeclareMCPServerTest {
     private static boolean isServerReady(String endpoint) {
         try {
             URL url = new URL(endpoint);
+            // noinspection StartSSRFNetHookCheckingInspection
             HttpURLConnection connection = (HttpURLConnection) 
url.openConnection();
             connection.setRequestMethod("GET");
             connection.setConnectTimeout(1000);
diff --git a/python/flink_agents/api/resource.py 
b/python/flink_agents/api/resource.py
index f7a83e46..fa0e5b4b 100644
--- a/python/flink_agents/api/resource.py
+++ b/python/flink_agents/api/resource.py
@@ -260,3 +260,6 @@ class Constant:
     JAVA_COLLECTION_MANAGEABLE_VECTOR_STORE = 
"flink_agents.api.vector_stores.java_vector_store.JavaCollectionManageableVectorStore"
     # chroma
     CHROMA_VECTOR_STORE = 
"flink_agents.integrations.vector_stores.chroma.chroma_vector_store.ChromaVectorStore"
+
+    # MCP
+    MCP_SERVER = "flink_agents.integrations.mcp.mcp.MCPServer"
diff --git 
a/python/flink_agents/e2e_tests/e2e_tests_integration/e2e_tests_mcp/mcp_test.py 
b/python/flink_agents/e2e_tests/e2e_tests_integration/e2e_tests_mcp/mcp_test.py
index 7c5e7448..37da6e26 100644
--- 
a/python/flink_agents/e2e_tests/e2e_tests_integration/e2e_tests_mcp/mcp_test.py
+++ 
b/python/flink_agents/e2e_tests/e2e_tests_integration/e2e_tests_mcp/mcp_test.py
@@ -52,7 +52,6 @@ from flink_agents.api.resource import (
 )
 from flink_agents.api.runner_context import RunnerContext
 from flink_agents.e2e_tests.test_utils import pull_model
-from flink_agents.integrations.mcp.mcp import MCPServer
 
 OLLAMA_MODEL = os.environ.get("MCP_OLLAMA_CHAT_MODEL", "qwen3:1.7b")
 MCP_SERVER_ENDPOINT = "http://127.0.0.1:8000/mcp";
@@ -70,9 +69,9 @@ class MyMCPAgent(Agent):
 
     @mcp_server
     @staticmethod
-    def my_mcp_server() -> MCPServer:
+    def my_mcp_server() -> ResourceDescriptor:
         """Define MCP server connection."""
-        return MCPServer(endpoint=MCP_SERVER_ENDPOINT)
+        return ResourceDescriptor(clazz=Constant.MCP_SERVER, 
endpoint=MCP_SERVER_ENDPOINT)
 
     @chat_model_connection
     @staticmethod
diff --git a/python/flink_agents/integrations/mcp/mcp.py 
b/python/flink_agents/integrations/mcp/mcp.py
index f430cbe0..79cf26f4 100644
--- a/python/flink_agents/integrations/mcp/mcp.py
+++ b/python/flink_agents/integrations/mcp/mcp.py
@@ -38,7 +38,7 @@ from typing_extensions import override
 
 from flink_agents.api.chat_message import ChatMessage, MessageRole
 from flink_agents.api.prompts.prompt import Prompt
-from flink_agents.api.resource import ResourceType, SerializableResource
+from flink_agents.api.resource import Resource, ResourceType
 from flink_agents.api.tools.tool import Tool, ToolMetadata, ToolType
 from flink_agents.integrations.mcp.utils import extract_mcp_content_item
 
@@ -49,7 +49,7 @@ class MCPTool(Tool):
     This represents a single tool from an MCP server.
     """
 
-    mcp_server: "MCPServer" = Field(default=None, exclude=True)
+    mcp_server: "MCPServer" = Field(default=None)
 
     @classmethod
     @override
@@ -86,7 +86,7 @@ class MCPPrompt(Prompt):
     title: str | None
     description: str | None = None
     prompt_arguments: list[PromptArgument] = Field(default_factory=list)
-    mcp_server: "MCPServer" = Field(default=None, exclude=True)
+    mcp_server: "MCPServer" = Field(default=None)
 
     def _check_arguments(self, **kwargs: str) -> Dict[str, str]:
         if self.mcp_server is None:
@@ -133,7 +133,7 @@ class MCPPrompt(Prompt):
             finally:
                 self.mcp_server = None
 
-class MCPServer(SerializableResource, ABC):
+class MCPServer(Resource, ABC):
     """Resource representing an MCP server and exposing its tools/prompts.
 
     This is a logical container for MCP tools and prompts; it is not directly 
invokable.
diff --git a/python/flink_agents/plan/agent_plan.py 
b/python/flink_agents/plan/agent_plan.py
index b9180ca1..daf66128 100644
--- a/python/flink_agents/plan/agent_plan.py
+++ b/python/flink_agents/plan/agent_plan.py
@@ -15,13 +15,12 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
#################################################################################
-from typing import Any, Dict, List, cast
+from typing import TYPE_CHECKING, Any, Dict, List, cast
 
 from pydantic import BaseModel, field_serializer, model_validator
 
 from flink_agents.api.agents.agent import Agent
-from flink_agents.api.resource import Resource, ResourceType
-from flink_agents.integrations.mcp.mcp import MCPServer
+from flink_agents.api.resource import Resource, ResourceDescriptor, 
ResourceType
 from flink_agents.plan.actions.action import Action
 from flink_agents.plan.actions.chat_model_action import CHAT_MODEL_ACTION
 from flink_agents.plan.actions.context_retrieval_action import 
CONTEXT_RETRIEVAL_ACTION
@@ -37,6 +36,9 @@ from flink_agents.plan.resource_provider import (
 )
 from flink_agents.plan.tools.function_tool import from_callable
 
+if TYPE_CHECKING:
+    from flink_agents.integrations.mcp.mcp import MCPServer
+
 BUILT_IN_ACTIONS = [CHAT_MODEL_ACTION, TOOL_CALL_ACTION, 
CONTEXT_RETRIEVAL_ACTION]
 
 
@@ -134,7 +136,7 @@ class AgentPlan(BaseModel):
                 actions_by_event[event_type].append(action.name)
 
         resource_providers = {}
-        for provider in _get_resource_providers(agent):
+        for provider in _get_resource_providers(agent, config):
             type = provider.type
             if type not in resource_providers:
                 resource_providers[type] = {}
@@ -284,7 +286,7 @@ def _get_actions(agent: Agent) -> List[Action]:
     return actions
 
 
-def _get_resource_providers(agent: Agent) -> List[ResourceProvider]:
+def _get_resource_providers(agent: Agent, config: AgentConfiguration) -> 
List[ResourceProvider]:
     resource_providers = []
     # retrieve resource declared by decorator
     for name, value in agent.__class__.__dict__.items():
@@ -334,8 +336,8 @@ def _get_resource_providers(agent: Agent) -> 
List[ResourceProvider]:
             if isinstance(value, staticmethod):
                 value = value.__func__
 
-            mcp_server = value()
-            _add_mcp_server(name, resource_providers, mcp_server)
+            descriptor = value()
+            _add_mcp_server(name, resource_providers, descriptor, config)
 
     # retrieve resource declared by add interface
     for name, prompt in agent.resources[ResourceType.PROMPT].items():
@@ -350,9 +352,8 @@ def _get_resource_providers(agent: Agent) -> 
List[ResourceProvider]:
             )
         )
 
-    for name, mcp_server in agent.resources[ResourceType.MCP_SERVER].items():
-        mcp_server = cast("MCPServer", mcp_server)
-        _add_mcp_server(name, resource_providers, mcp_server)
+    for name, descriptor in agent.resources[ResourceType.MCP_SERVER].items():
+        _add_mcp_server(name, resource_providers, descriptor)
 
     for resource_type in [
         ResourceType.CHAT_MODEL,
@@ -375,11 +376,17 @@ def _get_resource_providers(agent: Agent) -> 
List[ResourceProvider]:
 
 
 def _add_mcp_server(
-    name: str, resource_providers: List[ResourceProvider], mcp_server: 
MCPServer
+    name: str, resource_providers: List[ResourceProvider], descriptor: 
ResourceDescriptor, config: AgentConfiguration
 ) -> None:
-    resource_providers.append(
-        PythonSerializableResourceProvider.from_resource(name=name, 
resource=mcp_server)
-    )
+    provider = PythonResourceProvider.get(name=name, descriptor=descriptor)
+
+    resource_providers.append(provider)
+
+    def get_resource(name: str, descriptor: ResourceDescriptor) -> Any:
+        """Placeholder."""
+
+    mcp_server = cast("MCPServer", provider.provide(get_resource=get_resource, 
config=config))
+
     resource_providers.extend(
         [
             PythonSerializableResourceProvider.from_resource(

Reply via email to