This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-agents.git


The following commit(s) were added to refs/heads/main by this push:
     new f8eac10b [plan] Extract ResourceCache and PythonResourceBridge from 
AgentPlan (#548)
f8eac10b is described below

commit f8eac10b870ad62ab1799fe9d7d35cc25d40185c
Author: Weiqing Yang <[email protected]>
AuthorDate: Thu Apr 2 03:02:16 2026 -0700

    [plan] Extract ResourceCache and PythonResourceBridge from AgentPlan (#548)
---
 .../org/apache/flink/agents/plan/AgentPlan.java    | 119 ----------
 .../agents/plan/AgentPlanDeclareChatModelTest.java |  38 +++-
 .../agents/plan/AgentPlanDeclareMCPServerTest.java |  19 +-
 .../agents/plan/AgentPlanDeclareToolFieldTest.java |  21 +-
 .../plan/AgentPlanDeclareToolMethodTest.java       |  55 ++++-
 .../apache/flink/agents/plan/AgentPlanTest.java    | 121 -----------
 .../flink/agents/plan/FunctionToolPlanTest.java    |  22 +-
 python/flink_agents/plan/agent_plan.py             |  44 +---
 python/flink_agents/plan/tests/test_agent_plan.py  |   4 +-
 .../flink_agents/runtime/flink_runner_context.py   |  14 +-
 python/flink_agents/runtime/local_runner.py        |  12 +-
 python/flink_agents/runtime/resource_cache.py      |  90 ++++++++
 .../agents/runtime/PythonMCPResourceDiscovery.java |  92 ++++++++
 .../apache/flink/agents/runtime/ResourceCache.java | 134 ++++++++++++
 .../runtime/context/JavaRunnerContextImpl.java     |   4 +-
 .../agents/runtime/context/RunnerContextImpl.java  |  12 +-
 .../runtime/operator/ActionExecutionOperator.java  |  23 +-
 .../python/context/PythonRunnerContextImpl.java    |   4 +-
 .../flink/agents/runtime/ResourceCacheTest.java    | 241 +++++++++++++++++++++
 19 files changed, 744 insertions(+), 325 deletions(-)

diff --git a/plan/src/main/java/org/apache/flink/agents/plan/AgentPlan.java 
b/plan/src/main/java/org/apache/flink/agents/plan/AgentPlan.java
index 0641e819..054b2ed2 100644
--- a/plan/src/main/java/org/apache/flink/agents/plan/AgentPlan.java
+++ b/plan/src/main/java/org/apache/flink/agents/plan/AgentPlan.java
@@ -28,7 +28,6 @@ import org.apache.flink.agents.api.resource.Resource;
 import org.apache.flink.agents.api.resource.ResourceDescriptor;
 import org.apache.flink.agents.api.resource.ResourceType;
 import org.apache.flink.agents.api.resource.SerializableResource;
-import org.apache.flink.agents.api.resource.python.PythonResourceAdapter;
 import org.apache.flink.agents.api.resource.python.PythonResourceWrapper;
 import org.apache.flink.agents.api.tools.ToolMetadata;
 import org.apache.flink.agents.plan.actions.Action;
@@ -60,7 +59,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.Function;
 
 import static org.apache.flink.agents.api.resource.ResourceType.MCP_SERVER;
@@ -86,17 +84,11 @@ public class AgentPlan implements Serializable {
 
     private AgentConfiguration config;
 
-    private transient PythonResourceAdapter pythonResourceAdapter;
-
-    /** Cache for instantiated resources. */
-    private transient Map<ResourceType, Map<String, Resource>> resourceCache;
-
     public AgentPlan(Map<String, Action> actions, Map<String, List<Action>> 
actionsByEvent) {
         this.actions = actions;
         this.actionsByEvent = actionsByEvent;
         this.resourceProviders = new HashMap<>();
         this.config = new AgentConfiguration();
-        this.resourceCache = new ConcurrentHashMap<>();
     }
 
     public AgentPlan(
@@ -106,7 +98,6 @@ public class AgentPlan implements Serializable {
         this.actions = actions;
         this.actionsByEvent = actionsByEvent;
         this.resourceProviders = resourceProviders;
-        this.resourceCache = new ConcurrentHashMap<>();
         this.config = new AgentConfiguration();
     }
 
@@ -118,7 +109,6 @@ public class AgentPlan implements Serializable {
         this.actions = actions;
         this.actionsByEvent = actionsByEvent;
         this.resourceProviders = resourceProviders;
-        this.resourceCache = new ConcurrentHashMap<>();
         this.config = config;
     }
 
@@ -140,62 +130,6 @@ public class AgentPlan implements Serializable {
         this.config = config;
     }
 
-    public void setPythonResourceAdapter(PythonResourceAdapter adapter) throws 
Exception {
-        this.pythonResourceAdapter = adapter;
-        Map<String, ResourceProvider> servers = 
resourceProviders.get(MCP_SERVER);
-        if (servers == null) {
-            return;
-        }
-        servers.values().stream()
-                .filter(PythonResourceProvider.class::isInstance)
-                .map(PythonResourceProvider.class::cast)
-                .forEach(
-                        provider -> {
-                            provider.setPythonResourceAdapter(adapter);
-
-                            // Get tools and prompts from server
-                            try {
-                                PythonMCPServer server =
-                                        (PythonMCPServer)
-                                                provider.provide(
-                                                        (String anotherName,
-                                                                ResourceType 
anotherType) -> {
-                                                            try {
-                                                                return 
this.getResource(
-                                                                        
anotherName, anotherType);
-                                                            } catch (Exception 
e) {
-                                                                throw new 
RuntimeException(e);
-                                                            }
-                                                        });
-
-                                // Add tools to cache
-                                server.listTools()
-                                        .forEach(
-                                                tool ->
-                                                        resourceCache
-                                                                
.computeIfAbsent(
-                                                                        TOOL,
-                                                                        k ->
-                                                                               
 new ConcurrentHashMap<>())
-                                                                
.put(tool.getName(), tool));
-
-                                // Add prompts to cache
-                                server.listPrompts()
-                                        .forEach(
-                                                prompt ->
-                                                        resourceCache
-                                                                
.computeIfAbsent(
-                                                                        PROMPT,
-                                                                        k ->
-                                                                               
 new ConcurrentHashMap<>())
-                                                                
.put(prompt.getName(), prompt));
-                            } catch (Exception e) {
-                                throw new RuntimeException(
-                                        "Failed to process Python MCP server 
in Java", e);
-                            }
-                        });
-    }
-
     public Map<String, Action> getActions() {
         return actions;
     }
@@ -220,49 +154,6 @@ public class AgentPlan implements Serializable {
         return actionsByEvent.get(eventType);
     }
 
-    /**
-     * Get resource from agent plan.
-     *
-     * @param name the resource name
-     * @param type the resource type
-     * @return the resource instance
-     * @throws Exception if the resource cannot be found or created
-     */
-    public Resource getResource(String name, ResourceType type) throws 
Exception {
-        // Check cache first
-        if (resourceCache.containsKey(type) && 
resourceCache.get(type).containsKey(name)) {
-            return resourceCache.get(type).get(name);
-        }
-
-        // Get resource provider
-        if (!resourceProviders.containsKey(type)
-                || !resourceProviders.get(type).containsKey(name)) {
-            throw new IllegalArgumentException("Resource not found: " + name + 
" of type " + type);
-        }
-
-        ResourceProvider provider = resourceProviders.get(type).get(name);
-
-        if (pythonResourceAdapter != null && provider instanceof 
PythonResourceProvider) {
-            ((PythonResourceProvider) 
provider).setPythonResourceAdapter(pythonResourceAdapter);
-        }
-
-        // Create resource using provider
-        Resource resource =
-                provider.provide(
-                        (String anotherName, ResourceType anotherType) -> {
-                            try {
-                                return this.getResource(anotherName, 
anotherType);
-                            } catch (Exception e) {
-                                throw new RuntimeException(e);
-                            }
-                        });
-
-        // Cache the resource
-        resourceCache.computeIfAbsent(type, k -> new 
ConcurrentHashMap<>()).put(name, resource);
-
-        return resource;
-    }
-
     public AgentConfiguration getConfig() {
         return config;
     }
@@ -271,15 +162,6 @@ public class AgentPlan implements Serializable {
         return config.getConfData();
     }
 
-    public void close() throws Exception {
-        for (Map<String, Resource> resources : resourceCache.values()) {
-            for (Resource resource : resources.values()) {
-                resource.close();
-            }
-        }
-        resourceCache.clear();
-    }
-
     private void writeObject(ObjectOutputStream out) throws IOException {
         String serializedStr = new ObjectMapper().writeValueAsString(this);
         out.writeUTF(serializedStr);
@@ -292,7 +174,6 @@ public class AgentPlan implements Serializable {
         this.actionsByEvent = agentPlan.getActionsByEvent();
         this.resourceProviders = agentPlan.getResourceProviders();
         this.config = agentPlan.getConfig();
-        this.resourceCache = new ConcurrentHashMap<>();
     }
 
     private void extractActions(
diff --git 
a/plan/src/test/java/org/apache/flink/agents/plan/AgentPlanDeclareChatModelTest.java
 
b/plan/src/test/java/org/apache/flink/agents/plan/AgentPlanDeclareChatModelTest.java
index a829deff..6f1ecb60 100644
--- 
a/plan/src/test/java/org/apache/flink/agents/plan/AgentPlanDeclareChatModelTest.java
+++ 
b/plan/src/test/java/org/apache/flink/agents/plan/AgentPlanDeclareChatModelTest.java
@@ -93,6 +93,18 @@ class AgentPlanDeclareChatModelTest {
         agentPlan = new AgentPlan(new ChatAgent());
     }
 
+    /** Resolves a resource directly from its provider. */
+    private Resource resolveResource(String name, ResourceType type) throws 
Exception {
+        return agentPlan
+                .getResourceProviders()
+                .get(type)
+                .get(name)
+                .provide(
+                        (n, t) -> {
+                            throw new UnsupportedOperationException("No 
dependencies expected");
+                        });
+    }
+
     @Test
     @DisplayName("Discover @ChatModel in AgentPlan resource providers")
     void discoverChatModel() {
@@ -107,8 +119,7 @@ class AgentPlanDeclareChatModelTest {
     @DisplayName("Retrieve chat model and invoke chat(Prompt)")
     void retrieveAndChat() throws Exception {
         BaseChatModelSetup model =
-                (BaseChatModelSetup)
-                        agentPlan.getResource("testChatModel", 
ResourceType.CHAT_MODEL);
+                (BaseChatModelSetup) resolveResource("testChatModel", 
ResourceType.CHAT_MODEL);
         assertNotNull(model);
 
         Prompt prompt = Prompt.fromText("Hello world");
@@ -126,7 +137,15 @@ class AgentPlanDeclareChatModelTest {
         AgentPlan restored = mapper.readValue(json, AgentPlan.class);
 
         BaseChatModelSetup model =
-                (BaseChatModelSetup) restored.getResource("testChatModel", 
ResourceType.CHAT_MODEL);
+                (BaseChatModelSetup)
+                        restored.getResourceProviders()
+                                .get(ResourceType.CHAT_MODEL)
+                                .get("testChatModel")
+                                .provide(
+                                        (n, t) -> {
+                                            throw new 
UnsupportedOperationException(
+                                                    "No dependencies 
expected");
+                                        });
         ChatMessage reply =
                 
model.chat(Prompt.fromText("Hi").formatMessages(MessageRole.USER, new 
HashMap<>()));
         assertEquals("ok:Hi", reply.getContent());
@@ -148,10 +167,17 @@ class AgentPlanDeclareChatModelTest {
         AgentPlan actualPlan = new AgentPlan(agent);
         BaseChatModelSetup actualChatModel =
                 (BaseChatModelSetup)
-                        actualPlan.getResource("testChatModel", 
ResourceType.CHAT_MODEL);
+                        actualPlan
+                                .getResourceProviders()
+                                .get(ResourceType.CHAT_MODEL)
+                                .get("testChatModel")
+                                .provide(
+                                        (n, t) -> {
+                                            throw new 
UnsupportedOperationException(
+                                                    "No dependencies 
expected");
+                                        });
         BaseChatModelSetup expectedChatModel =
-                (BaseChatModelSetup)
-                        agentPlan.getResource("testChatModel", 
ResourceType.CHAT_MODEL);
+                (BaseChatModelSetup) resolveResource("testChatModel", 
ResourceType.CHAT_MODEL);
         Assertions.assertEquals(expectedChatModel.getClass(), 
actualChatModel.getClass());
         Assertions.assertEquals(expectedChatModel.getConnection(), 
actualChatModel.getConnection());
         Assertions.assertEquals(expectedChatModel.getModel(), 
actualChatModel.getModel());
diff --git 
a/plan/src/test/java/org/apache/flink/agents/plan/AgentPlanDeclareMCPServerTest.java
 
b/plan/src/test/java/org/apache/flink/agents/plan/AgentPlanDeclareMCPServerTest.java
index 09d9e6a7..e3c4fe2e 100644
--- 
a/plan/src/test/java/org/apache/flink/agents/plan/AgentPlanDeclareMCPServerTest.java
+++ 
b/plan/src/test/java/org/apache/flink/agents/plan/AgentPlanDeclareMCPServerTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.agents.api.agents.Agent;
 import org.apache.flink.agents.api.annotation.Action;
 import org.apache.flink.agents.api.context.RunnerContext;
 import org.apache.flink.agents.api.prompt.Prompt;
+import org.apache.flink.agents.api.resource.Resource;
 import org.apache.flink.agents.api.resource.ResourceDescriptor;
 import org.apache.flink.agents.api.resource.ResourceName;
 import org.apache.flink.agents.api.resource.ResourceType;
@@ -184,6 +185,18 @@ class AgentPlanDeclareMCPServerTest {
         agentPlan = new AgentPlan(new TestMCPAgent());
     }
 
+    /** Resolves a resource directly from its provider. */
+    private Resource resolveResource(String name, ResourceType type) throws 
Exception {
+        return agentPlan
+                .getResourceProviders()
+                .get(type)
+                .get(name)
+                .provide(
+                        (n, t) -> {
+                            throw new UnsupportedOperationException("No 
dependencies expected");
+                        });
+    }
+
     @AfterAll
     static void afterAll() {
         if (pythonMcpServerProcess != null) {
@@ -237,7 +250,7 @@ class AgentPlanDeclareMCPServerTest {
     @DisabledOnJre(JRE.JAVA_11)
     @DisplayName("Retrieve MCP tool from AgentPlan - add tool")
     void retrieveMCPToolAdd() throws Exception {
-        Tool tool = (Tool) agentPlan.getResource("add", ResourceType.TOOL);
+        Tool tool = (Tool) resolveResource("add", ResourceType.TOOL);
         assertNotNull(tool);
         assertInstanceOf(MCPTool.class, tool);
 
@@ -259,7 +272,7 @@ class AgentPlanDeclareMCPServerTest {
     @DisabledOnJre(JRE.JAVA_11)
     @DisplayName("Retrieve MCP prompt from AgentPlan - ask_sum")
     void retrieveMCPPromptAskSum() throws Exception {
-        Prompt prompt = (Prompt) agentPlan.getResource("ask_sum", 
ResourceType.PROMPT);
+        Prompt prompt = (Prompt) resolveResource("ask_sum", 
ResourceType.PROMPT);
         assertNotNull(prompt);
         assertInstanceOf(MCPPrompt.class, prompt);
 
@@ -305,7 +318,7 @@ class AgentPlanDeclareMCPServerTest {
     @DisabledOnJre(JRE.JAVA_11)
     @DisplayName("Test metadata from MCP tool - add")
     void testMCPToolMetadata() throws Exception {
-        Tool tool = (Tool) agentPlan.getResource("add", ResourceType.TOOL);
+        Tool tool = (Tool) resolveResource("add", ResourceType.TOOL);
         ToolMetadata metadata = tool.getMetadata();
 
         assertEquals("add", metadata.getName());
diff --git 
a/plan/src/test/java/org/apache/flink/agents/plan/AgentPlanDeclareToolFieldTest.java
 
b/plan/src/test/java/org/apache/flink/agents/plan/AgentPlanDeclareToolFieldTest.java
index 7a96b5db..b45bdde7 100644
--- 
a/plan/src/test/java/org/apache/flink/agents/plan/AgentPlanDeclareToolFieldTest.java
+++ 
b/plan/src/test/java/org/apache/flink/agents/plan/AgentPlanDeclareToolFieldTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.agents.api.agents.Agent;
 import org.apache.flink.agents.api.annotation.Action;
 import org.apache.flink.agents.api.annotation.ToolParam;
 import org.apache.flink.agents.api.context.RunnerContext;
+import org.apache.flink.agents.api.resource.Resource;
 import org.apache.flink.agents.api.resource.ResourceType;
 import org.apache.flink.agents.api.tools.Tool;
 import org.apache.flink.agents.api.tools.ToolMetadata;
@@ -117,6 +118,18 @@ class AgentPlanDeclareToolFieldTest {
         agentPlan = new AgentPlan(new TestAgent());
     }
 
+    /** Resolves a resource directly from its provider, bypassing 
ResourceCache. */
+    private Resource resolveResource(String name, ResourceType type) throws 
Exception {
+        return agentPlan
+                .getResourceProviders()
+                .get(type)
+                .get(name)
+                .provide(
+                        (n, t) -> {
+                            throw new UnsupportedOperationException("No 
dependencies expected");
+                        });
+    }
+
     @Test
     @DisplayName("Extract FunctionTool resources into providers")
     void extractTools() {
@@ -131,7 +144,7 @@ class AgentPlanDeclareToolFieldTest {
     @Test
     @DisplayName("Retrieve FunctionTool and call with parameters")
     void callCalculator() throws Exception {
-        Tool tool = (Tool) agentPlan.getResource("calculator", 
ResourceType.TOOL);
+        Tool tool = (Tool) resolveResource("calculator", ResourceType.TOOL);
         assertInstanceOf(FunctionTool.class, tool);
         ToolResponse r =
                 tool.call(
@@ -148,7 +161,7 @@ class AgentPlanDeclareToolFieldTest {
     @Test
     @DisplayName("Call weather FunctionTool")
     void callWeather() throws Exception {
-        Tool tool = (Tool) agentPlan.getResource("weather", ResourceType.TOOL);
+        Tool tool = (Tool) resolveResource("weather", ResourceType.TOOL);
         assertInstanceOf(FunctionTool.class, tool);
         ToolResponse r =
                 tool.call(
@@ -165,7 +178,7 @@ class AgentPlanDeclareToolFieldTest {
     @Test
     @DisplayName("FunctionTool metadata and schema")
     void metadataSchema() throws Exception {
-        FunctionTool tool = (FunctionTool) agentPlan.getResource("calculator", 
ResourceType.TOOL);
+        FunctionTool tool = (FunctionTool) resolveResource("calculator", 
ResourceType.TOOL);
         ToolMetadata md = tool.getMetadata();
         assertEquals("calculate", md.getName());
         assertEquals("Performs basic arithmetic operations", 
md.getDescription());
@@ -179,7 +192,7 @@ class AgentPlanDeclareToolFieldTest {
     @Test
     @DisplayName("FunctionTool error cases")
     void calculatorErrors() throws Exception {
-        Tool tool = (Tool) agentPlan.getResource("calculator", 
ResourceType.TOOL);
+        Tool tool = (Tool) resolveResource("calculator", ResourceType.TOOL);
         ToolResponse r =
                 tool.call(
                         new ToolParameters(
diff --git 
a/plan/src/test/java/org/apache/flink/agents/plan/AgentPlanDeclareToolMethodTest.java
 
b/plan/src/test/java/org/apache/flink/agents/plan/AgentPlanDeclareToolMethodTest.java
index cbe88b2e..18fc3072 100644
--- 
a/plan/src/test/java/org/apache/flink/agents/plan/AgentPlanDeclareToolMethodTest.java
+++ 
b/plan/src/test/java/org/apache/flink/agents/plan/AgentPlanDeclareToolMethodTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.agents.api.agents.Agent;
 import org.apache.flink.agents.api.annotation.Action;
 import org.apache.flink.agents.api.annotation.ToolParam;
 import org.apache.flink.agents.api.context.RunnerContext;
+import org.apache.flink.agents.api.resource.Resource;
 import org.apache.flink.agents.api.resource.ResourceType;
 import org.apache.flink.agents.api.tools.Tool;
 import org.apache.flink.agents.api.tools.ToolMetadata;
@@ -93,6 +94,18 @@ class AgentPlanDeclareToolMethodTest {
         agentPlan = new AgentPlan(new TestAgent());
     }
 
+    /** Resolves a resource directly from its provider, bypassing 
ResourceCache. */
+    private Resource resolveResource(String name, ResourceType type) throws 
Exception {
+        return agentPlan
+                .getResourceProviders()
+                .get(type)
+                .get(name)
+                .provide(
+                        (n, t) -> {
+                            throw new UnsupportedOperationException("No 
dependencies expected");
+                        });
+    }
+
     @Test
     @DisplayName("Discover static @Tool methods and register providers")
     void discoverTools() {
@@ -104,8 +117,17 @@ class AgentPlanDeclareToolMethodTest {
         assertTrue(toolProviders.containsKey("getWeather"));
     }
 
-    void checkToolCall(AgentPlan agentPlan) throws Exception {
-        Tool calculator = (Tool) agentPlan.getResource("calculate", 
ResourceType.TOOL);
+    void checkToolCall(AgentPlan plan) throws Exception {
+        Tool calculator =
+                (Tool)
+                        plan.getResourceProviders()
+                                .get(ResourceType.TOOL)
+                                .get("calculate")
+                                .provide(
+                                        (n, t) -> {
+                                            throw new 
UnsupportedOperationException(
+                                                    "No dependencies 
expected");
+                                        });
         ToolParameters tp =
                 new ToolParameters(
                         new HashMap<>(
@@ -117,7 +139,16 @@ class AgentPlanDeclareToolMethodTest {
         assertTrue(r.isSuccess());
         assertEquals(45.0, (Double) r.getResult(), 0.001);
 
-        Tool weather = (Tool) agentPlan.getResource("getWeather", 
ResourceType.TOOL);
+        Tool weather =
+                (Tool)
+                        plan.getResourceProviders()
+                                .get(ResourceType.TOOL)
+                                .get("getWeather")
+                                .provide(
+                                        (n, t) -> {
+                                            throw new 
UnsupportedOperationException(
+                                                    "No dependencies 
expected");
+                                        });
         ToolResponse wr =
                 weather.call(
                         new ToolParameters(
@@ -152,13 +183,14 @@ class AgentPlanDeclareToolMethodTest {
                         Tool.fromMethod(
                                 TestAgent.class.getMethod(
                                         "getWeather", String.class, 
String.class)));
-        checkToolCall(new AgentPlan(agent));
+        AgentPlan addedPlan = new AgentPlan(agent);
+        checkToolCall(addedPlan);
     }
 
     @Test
     @DisplayName("Parameter conversion and errors")
     void paramConversionAndErrors() throws Exception {
-        Tool calculator = (Tool) agentPlan.getResource("calculate", 
ResourceType.TOOL);
+        Tool calculator = (Tool) resolveResource("calculate", 
ResourceType.TOOL);
 
         ToolResponse r =
                 calculator.call(
@@ -213,7 +245,7 @@ class AgentPlanDeclareToolMethodTest {
     @Test
     @DisplayName("Metadata and schema shape")
     void metadataSchema() throws Exception {
-        Tool calculator = (Tool) agentPlan.getResource("calculate", 
ResourceType.TOOL);
+        Tool calculator = (Tool) resolveResource("calculate", 
ResourceType.TOOL);
         ToolMetadata md = calculator.getMetadata();
         assertEquals("calculate", md.getName());
         assertEquals("Performs basic arithmetic operations", 
md.getDescription());
@@ -230,7 +262,16 @@ class AgentPlanDeclareToolMethodTest {
         ObjectMapper mapper = new ObjectMapper();
         String json = mapper.writeValueAsString(agentPlan);
         AgentPlan restored = mapper.readValue(json, AgentPlan.class);
-        Tool calculator = (Tool) restored.getResource("calculate", 
ResourceType.TOOL);
+        Tool calculator =
+                (Tool)
+                        restored.getResourceProviders()
+                                .get(ResourceType.TOOL)
+                                .get("calculate")
+                                .provide(
+                                        (n, t) -> {
+                                            throw new 
UnsupportedOperationException(
+                                                    "No dependencies 
expected");
+                                        });
         ToolResponse r =
                 calculator.call(
                         new ToolParameters(
diff --git a/plan/src/test/java/org/apache/flink/agents/plan/AgentPlanTest.java 
b/plan/src/test/java/org/apache/flink/agents/plan/AgentPlanTest.java
index ad513248..71896903 100644
--- a/plan/src/test/java/org/apache/flink/agents/plan/AgentPlanTest.java
+++ b/plan/src/test/java/org/apache/flink/agents/plan/AgentPlanTest.java
@@ -24,8 +24,6 @@ import org.apache.flink.agents.api.OutputEvent;
 import org.apache.flink.agents.api.agents.Agent;
 import org.apache.flink.agents.api.annotation.ChatModelSetup;
 import org.apache.flink.agents.api.annotation.Tool;
-import org.apache.flink.agents.api.chat.messages.ChatMessage;
-import org.apache.flink.agents.api.chat.model.python.PythonChatModelSetup;
 import org.apache.flink.agents.api.context.RunnerContext;
 import org.apache.flink.agents.api.resource.Resource;
 import org.apache.flink.agents.api.resource.ResourceDescriptor;
@@ -33,10 +31,6 @@ import org.apache.flink.agents.api.resource.ResourceType;
 import org.apache.flink.agents.api.resource.SerializableResource;
 import org.apache.flink.agents.api.resource.python.PythonResourceAdapter;
 import org.apache.flink.agents.api.resource.python.PythonResourceWrapper;
-import 
org.apache.flink.agents.api.vectorstores.CollectionManageableVectorStore;
-import org.apache.flink.agents.api.vectorstores.Document;
-import org.apache.flink.agents.api.vectorstores.VectorStoreQuery;
-import org.apache.flink.agents.api.vectorstores.VectorStoreQueryResult;
 import org.apache.flink.agents.plan.actions.Action;
 import 
org.apache.flink.agents.plan.resourceprovider.JavaSerializableResourceProvider;
 import org.apache.flink.agents.plan.resourceprovider.PythonResourceProvider;
@@ -50,7 +44,6 @@ import java.util.Map;
 import java.util.function.BiFunction;
 
 import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Test for {@link AgentPlan} constructor that takes an Agent. */
 public class AgentPlanTest {
@@ -178,72 +171,6 @@ public class AgentPlanTest {
         }
     }
 
-    public static class TestPythonResourceAdapter implements 
PythonResourceAdapter {
-
-        @Override
-        public Object getResource(String resourceName, String resourceType) {
-            return null;
-        }
-
-        @Override
-        public PyObject initPythonResource(
-                String module, String clazz, Map<String, Object> kwargs) {
-            return null;
-        }
-
-        @Override
-        public Object toPythonChatMessage(ChatMessage message) {
-            return null;
-        }
-
-        @Override
-        public ChatMessage fromPythonChatMessage(Object pythonChatMessage) {
-            return null;
-        }
-
-        @Override
-        public Object toPythonDocuments(List<Document> documents) {
-            return null;
-        }
-
-        @Override
-        public List<Document> fromPythonDocuments(List<PyObject> 
pythonDocuments) {
-            return List.of();
-        }
-
-        @Override
-        public Object toPythonVectorStoreQuery(VectorStoreQuery query) {
-            return null;
-        }
-
-        @Override
-        public VectorStoreQueryResult fromPythonVectorStoreQueryResult(
-                PyObject pythonVectorStoreQueryResult) {
-            return null;
-        }
-
-        @Override
-        public CollectionManageableVectorStore.Collection fromPythonCollection(
-                PyObject pythonCollection) {
-            return null;
-        }
-
-        @Override
-        public Object 
convertToPythonTool(org.apache.flink.agents.api.tools.Tool tool) {
-            return null;
-        }
-
-        @Override
-        public Object callMethod(Object obj, String methodName, Map<String, 
Object> kwargs) {
-            return null;
-        }
-
-        @Override
-        public Object invoke(String name, Object... args) {
-            return null;
-        }
-    }
-
     @Test
     public void testConstructorWithAgent() throws Exception {
         // Create an agent instance
@@ -371,20 +298,6 @@ public class AgentPlanTest {
                 123, actualPlan.getActionConfigValue("handleMultipleEvents", 
"key"));
     }
 
-    @Test
-    public void testGetResourceNotFound() throws Exception {
-        TestAgent agent = new TestAgent();
-        AgentPlan agentPlan = new AgentPlan(agent);
-
-        // Test getting non-existent resource throws exception
-        try {
-            agentPlan.getResource("non-existent", ResourceType.CHAT_MODEL);
-            assertThat(false).as("Should have thrown 
IllegalArgumentException").isTrue();
-        } catch (IllegalArgumentException e) {
-            assertThat(e.getMessage()).contains("Resource not found: 
non-existent");
-        }
-    }
-
     @Test
     public void testExtractResourceProvidersFromAgent() throws Exception {
         // Create an agent with resource annotations
@@ -444,38 +357,4 @@ public class AgentPlanTest {
         
assertThat(pythonChatModelProvider.getName()).isEqualTo("pythonChatModel");
         
assertThat(pythonChatModelProvider.getType()).isEqualTo(ResourceType.CHAT_MODEL);
     }
-
-    @Test
-    public void testGetResourceFromResourceProvider() throws Exception {
-        // Create an agent with resource annotations
-        TestAgentWithResources agent = new TestAgentWithResources();
-        AgentPlan agentPlan = new AgentPlan(agent);
-
-        // Test getting a tool resource
-        Resource myTool = agentPlan.getResource("myTool", ResourceType.TOOL);
-        assertThat(myTool).isNotNull();
-        assertThat(myTool).isInstanceOf(TestTool.class);
-        assertThat(myTool.getResourceType()).isEqualTo(ResourceType.TOOL);
-
-        // Test getting a chat model resource
-        Resource chatModel = agentPlan.getResource("chatModel", 
ResourceType.CHAT_MODEL);
-        assertThat(chatModel).isNotNull();
-        assertThat(chatModel).isInstanceOf(TestSerializableChatModel.class);
-        
assertThat(chatModel.getResourceType()).isEqualTo(ResourceType.CHAT_MODEL);
-
-        assertThatThrownBy(() -> agentPlan.getResource("pythonChatModel", 
ResourceType.CHAT_MODEL))
-                .isInstanceOf(IllegalStateException.class)
-                .hasMessageContaining("PythonResourceAdapter is not set");
-
-        agentPlan.setPythonResourceAdapter(new TestPythonResourceAdapter());
-        Resource pythonChatModel =
-                agentPlan.getResource("pythonChatModel", 
ResourceType.CHAT_MODEL);
-        assertThat(pythonChatModel).isNotNull();
-        assertThat(pythonChatModel).isInstanceOf(PythonChatModelSetup.class);
-        
assertThat(pythonChatModel.getResourceType()).isEqualTo(ResourceType.CHAT_MODEL);
-
-        // Test that resources are cached (should be the same instance)
-        Resource myToolAgain = agentPlan.getResource("myTool", 
ResourceType.TOOL);
-        assertThat(myTool).isSameAs(myToolAgain);
-    }
 }
diff --git 
a/plan/src/test/java/org/apache/flink/agents/plan/FunctionToolPlanTest.java 
b/plan/src/test/java/org/apache/flink/agents/plan/FunctionToolPlanTest.java
index 4e99464a..a7cef61d 100644
--- a/plan/src/test/java/org/apache/flink/agents/plan/FunctionToolPlanTest.java
+++ b/plan/src/test/java/org/apache/flink/agents/plan/FunctionToolPlanTest.java
@@ -90,7 +90,16 @@ class FunctionToolPlanTest {
     void functionToolAgentPlan() throws Exception {
         AgentPlan plan = new AgentPlan(new TestAgent());
 
-        FunctionTool javaTool = (FunctionTool) plan.getResource("javaTool", 
ResourceType.TOOL);
+        FunctionTool javaTool =
+                (FunctionTool)
+                        plan.getResourceProviders()
+                                .get(ResourceType.TOOL)
+                                .get("javaTool")
+                                .provide(
+                                        (n, t) -> {
+                                            throw new 
UnsupportedOperationException(
+                                                    "No dependencies 
expected");
+                                        });
         ToolResponse ok =
                 javaTool.call(
                         new ToolParameters(
@@ -102,7 +111,16 @@ class FunctionToolPlanTest {
         assertTrue(ok.isSuccess());
         assertEquals(36.0, (Double) ok.getResult(), 1e-9);
 
-        FunctionTool pyTool = (FunctionTool) plan.getResource("pyTool", 
ResourceType.TOOL);
+        FunctionTool pyTool =
+                (FunctionTool)
+                        plan.getResourceProviders()
+                                .get(ResourceType.TOOL)
+                                .get("pyTool")
+                                .provide(
+                                        (n, t) -> {
+                                            throw new 
UnsupportedOperationException(
+                                                    "No dependencies 
expected");
+                                        });
         ToolResponse err = pyTool.call(new ToolParameters(new 
HashMap<>(Map.of("x", 1))));
         assertFalse(err.isSuccess());
     }
diff --git a/python/flink_agents/plan/agent_plan.py 
b/python/flink_agents/plan/agent_plan.py
index daf66128..2f31ff3c 100644
--- a/python/flink_agents/plan/agent_plan.py
+++ b/python/flink_agents/plan/agent_plan.py
@@ -20,7 +20,7 @@ from typing import TYPE_CHECKING, Any, Dict, List, cast
 from pydantic import BaseModel, field_serializer, model_validator
 
 from flink_agents.api.agents.agent import Agent
-from flink_agents.api.resource import Resource, ResourceDescriptor, 
ResourceType
+from flink_agents.api.resource import ResourceDescriptor, ResourceType
 from flink_agents.plan.actions.action import Action
 from flink_agents.plan.actions.chat_model_action import CHAT_MODEL_ACTION
 from flink_agents.plan.actions.context_retrieval_action import 
CONTEXT_RETRIEVAL_ACTION
@@ -59,8 +59,6 @@ class AgentPlan(BaseModel):
     actions_by_event: Dict[str, List[str]]
     resource_providers: Dict[ResourceType, Dict[str, ResourceProvider]] | None 
= None
     config: AgentConfiguration | None = None
-    __resources: Dict[ResourceType, Dict[str, Resource]] = {}
-    __j_resource_adapter: Any = None
 
     @field_serializer("resource_providers")
     def __serialize_resource_providers(
@@ -199,40 +197,6 @@ class AgentPlan(BaseModel):
         """
         return self.actions[action_name].config.get(key, None)
 
-    def get_resource(self, name: str, type: ResourceType) -> Resource:
-        """Get resource from agent plan.
-
-        Parameters
-        ----------
-        name : str
-            The name of the resource.
-        type : ResourceType
-            The type of the resource.
-        """
-        if type not in self.__resources:
-            self.__resources[type] = {}
-        if name not in self.__resources[type]:
-            resource_provider = self.resource_providers[type][name]
-            if isinstance(resource_provider, JavaResourceProvider):
-                
resource_provider.set_java_resource_adapter(self.__j_resource_adapter)
-            resource = resource_provider.provide(
-                get_resource=self.get_resource, config=self.config
-            )
-            self.__resources[type][name] = resource
-        return self.__resources[type][name]
-
-    def set_java_resource_adapter(self, j_resource_adapter: Any) -> None:
-        """Set java resource adapter for java resource provider."""
-        self.__j_resource_adapter = j_resource_adapter
-
-    def close(self) -> None:
-        """Clean up the resources."""
-        for type in self.__resources:
-            for name in self.__resources[type]:
-                self.__resources[type][name].close()
-        self.__resources.clear()
-
-
 
 def _get_actions(agent: Agent) -> List[Action]:
     """Extract all registered agent actions from an agent.
@@ -353,7 +317,7 @@ def _get_resource_providers(agent: Agent, config: 
AgentConfiguration) -> List[Re
         )
 
     for name, descriptor in agent.resources[ResourceType.MCP_SERVER].items():
-        _add_mcp_server(name, resource_providers, descriptor)
+        _add_mcp_server(name, resource_providers, descriptor, config)
 
     for resource_type in [
         ResourceType.CHAT_MODEL,
@@ -382,8 +346,8 @@ def _add_mcp_server(
 
     resource_providers.append(provider)
 
-    def get_resource(name: str, descriptor: ResourceDescriptor) -> Any:
-        """Placeholder."""
+    def get_resource(name: str, type: ResourceType) -> Any:
+        """Placeholder - MCP server construction doesn't need resource 
resolution."""
 
     mcp_server = cast("MCPServer", provider.provide(get_resource=get_resource, 
config=config))
 
diff --git a/python/flink_agents/plan/tests/test_agent_plan.py 
b/python/flink_agents/plan/tests/test_agent_plan.py
index 6050c485..2b022095 100644
--- a/python/flink_agents/plan/tests/test_agent_plan.py
+++ b/python/flink_agents/plan/tests/test_agent_plan.py
@@ -45,6 +45,7 @@ from flink_agents.api.vector_stores.vector_store import (
 from flink_agents.plan.agent_plan import AgentPlan
 from flink_agents.plan.configuration import AgentConfiguration
 from flink_agents.plan.function import PythonFunction
+from flink_agents.runtime.resource_cache import ResourceCache
 
 
 class AgentForTest(Agent):  # noqa D101
@@ -255,7 +256,8 @@ def test_agent_plan_deserialize(agent_plan: AgentPlan) -> 
None:  # noqa: D103
 
 def test_get_resource() -> None:  # noqa: D103
     agent_plan = AgentPlan.from_agent(MyAgent(), AgentConfiguration())
-    mock = agent_plan.get_resource("mock", ResourceType.CHAT_MODEL)
+    cache = ResourceCache(agent_plan.resource_providers, agent_plan.config)
+    mock = cache.get_resource("mock", ResourceType.CHAT_MODEL)
     assert (
         mock.chat(ChatMessage(role=MessageRole.USER, content="")).content
         == "8.8.8.8 mock resource just for testing."
diff --git a/python/flink_agents/runtime/flink_runner_context.py 
b/python/flink_agents/runtime/flink_runner_context.py
index 7591c062..2c593c0f 100644
--- a/python/flink_agents/runtime/flink_runner_context.py
+++ b/python/flink_agents/runtime/flink_runner_context.py
@@ -45,6 +45,7 @@ from 
flink_agents.runtime.memory.vector_store_long_term_memory import (
     VectorStoreLongTermMemory,
 )
 from flink_agents.runtime.python_java_utils import _build_event_log_string
+from flink_agents.runtime.resource_cache import ResourceCache
 
 logger = logging.getLogger(__name__)
 
@@ -204,7 +205,10 @@ class FlinkRunnerContext(RunnerContext):
         """
         self._j_runner_context = j_runner_context
         self.__agent_plan = AgentPlan.model_validate_json(agent_plan_json)
-        self.__agent_plan.set_java_resource_adapter(j_resource_adapter)
+        self.__resource_cache = ResourceCache(
+            self.__agent_plan.resource_providers, self.__agent_plan.config
+        )
+        self.__resource_cache.set_java_resource_adapter(j_resource_adapter)
         self.executor = executor
 
     def set_long_term_memory(self, ltm: InternalBaseLongTermMemory) -> None:
@@ -237,7 +241,7 @@ class FlinkRunnerContext(RunnerContext):
 
     @override
     def get_resource(self, name: str, type: ResourceType, metric_group: 
MetricGroup = None) -> Resource:
-        resource = self.__agent_plan.get_resource(name, type)
+        resource = self.__resource_cache.get_resource(name, type)
         # Bind metric group to the resource
         resource.set_metric_group(metric_group or self.action_metric_group)
         return resource
@@ -503,11 +507,11 @@ class FlinkRunnerContext(RunnerContext):
         if self.long_term_memory is not None:
             self.long_term_memory.close()
 
-        if self.__agent_plan is not None:
+        if self.__resource_cache is not None:
             try:
-                self.__agent_plan.close()
+                self.__resource_cache.close()
             finally:
-                self.__agent_plan = None
+                self.__resource_cache = None
 
 
 def create_flink_runner_context(
diff --git a/python/flink_agents/runtime/local_runner.py 
b/python/flink_agents/runtime/local_runner.py
index e4124439..75a73efb 100644
--- a/python/flink_agents/runtime/local_runner.py
+++ b/python/flink_agents/runtime/local_runner.py
@@ -35,6 +35,7 @@ from flink_agents.plan.agent_plan import AgentPlan
 from flink_agents.plan.configuration import AgentConfiguration
 from flink_agents.runtime.agent_runner import AgentRunner
 from flink_agents.runtime.local_memory_object import LocalMemoryObject
+from flink_agents.runtime.resource_cache import ResourceCache
 
 logging.basicConfig(
     level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
@@ -81,6 +82,9 @@ class LocalRunnerContext(RunnerContext):
             KeyedStream.
         """
         self.__agent_plan = agent_plan
+        self.__resource_cache = ResourceCache(
+            agent_plan.resource_providers, agent_plan.config
+        )
         self.__key = key
         self.events = deque()
         self._sensory_mem_store = {}
@@ -120,7 +124,7 @@ class LocalRunnerContext(RunnerContext):
 
     @override
     def get_resource(self, name: str, type: ResourceType, metric_group: 
MetricGroup = None) -> Resource:
-        return self.__agent_plan.get_resource(name, type)
+        return self.__resource_cache.get_resource(name, type)
 
     @property
     @override
@@ -244,11 +248,11 @@ class LocalRunnerContext(RunnerContext):
 
     def close(self) -> None:
         """Cleanup the resource."""
-        if self.__agent_plan is not None:
+        if self.__resource_cache is not None:
             try:
-                self.__agent_plan.close()
+                self.__resource_cache.close()
             finally:
-                self.__agent_plan = None
+                self.__resource_cache = None
 
 
 class LocalRunner(AgentRunner):
diff --git a/python/flink_agents/runtime/resource_cache.py 
b/python/flink_agents/runtime/resource_cache.py
new file mode 100644
index 00000000..fa0ca617
--- /dev/null
+++ b/python/flink_agents/runtime/resource_cache.py
@@ -0,0 +1,90 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+from typing import Any, Dict
+
+from flink_agents.api.resource import Resource, ResourceType
+from flink_agents.plan.configuration import AgentConfiguration
+from flink_agents.plan.resource_provider import JavaResourceProvider, 
ResourceProvider
+
+
+class ResourceCache:
+    """Lazily resolves and caches Resource instances from ResourceProviders.
+
+    Resources are created on first access via their provider's ``provide()`` 
method
+    and cached for subsequent lookups. Supports recursive dependency 
resolution — a
+    resource can depend on other resources.
+
+    This class is designed for single-threaded access within Flink's mailbox
+    execution model.
+    """
+
+    def __init__(
+        self,
+        resource_providers: Dict[ResourceType, Dict[str, ResourceProvider]],
+        config: AgentConfiguration | None = None,
+    ) -> None:
+        """Create a ResourceCache from the given resource providers and config.
+
+        Parameters
+        ----------
+        resource_providers : Dict[ResourceType, Dict[str, ResourceProvider]]
+            Two-level mapping of resource type to resource name to provider.
+        config : AgentConfiguration | None
+            Agent configuration passed to providers during resource creation.
+        """
+        self._resource_providers = resource_providers or {}
+        self._config = config
+        self._cache: Dict[ResourceType, Dict[str, Resource]] = {}
+        self._j_resource_adapter: Any = None
+
+    def set_java_resource_adapter(self, j_resource_adapter: Any) -> None:
+        """Set Java resource adapter for Java resource providers."""
+        self._j_resource_adapter = j_resource_adapter
+
+    def get_resource(self, name: str, type: ResourceType) -> Resource:
+        """Get resource by name and type, creating it from its provider if not 
cached.
+
+        Parameters
+        ----------
+        name : str
+            The name of the resource.
+        type : ResourceType
+            The type of the resource.
+        """
+        cached = self._cache.get(type, {}).get(name)
+        if cached is not None:
+            return cached
+        providers = self._resource_providers.get(type)
+        if providers is None or name not in providers:
+            msg = f"Resource not found: '{name}' of type {type}"
+            raise KeyError(msg)
+        resource_provider = providers[name]
+        if isinstance(resource_provider, JavaResourceProvider):
+            
resource_provider.set_java_resource_adapter(self._j_resource_adapter)
+        resource = resource_provider.provide(
+            get_resource=self.get_resource, config=self._config
+        )
+        self._cache.setdefault(type, {})[name] = resource
+        return resource
+
+    def close(self) -> None:
+        """Clean up all cached resources."""
+        for typed in self._cache.values():
+            for resource in typed.values():
+                resource.close()
+        self._cache.clear()
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/PythonMCPResourceDiscovery.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/PythonMCPResourceDiscovery.java
new file mode 100644
index 00000000..10e21968
--- /dev/null
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/PythonMCPResourceDiscovery.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.runtime;
+
+import org.apache.flink.agents.api.resource.ResourceType;
+import org.apache.flink.agents.api.resource.python.PythonResourceAdapter;
+import org.apache.flink.agents.plan.resource.python.PythonMCPPrompt;
+import org.apache.flink.agents.plan.resource.python.PythonMCPServer;
+import org.apache.flink.agents.plan.resource.python.PythonMCPTool;
+import org.apache.flink.agents.plan.resourceprovider.PythonResourceProvider;
+import org.apache.flink.agents.plan.resourceprovider.ResourceProvider;
+
+import java.util.Map;
+
+import static org.apache.flink.agents.api.resource.ResourceType.MCP_SERVER;
+import static org.apache.flink.agents.api.resource.ResourceType.PROMPT;
+import static org.apache.flink.agents.api.resource.ResourceType.TOOL;
+
+/**
+ * Discovers tools and prompts from Python MCP servers and registers them in a 
ResourceCache.
+ *
+ * <p>Called once during operator initialization after the Python interpreter 
is available.
+ */
+public class PythonMCPResourceDiscovery {
+
+    /**
+     * Initializes Python MCP servers from the resource providers, extracts 
their tools and prompts,
+     * and registers them in the cache.
+     *
+     * @param resourceProviders the resource providers from the agent plan
+     * @param adapter the Python resource adapter
+     * @param cache the resource cache to register discovered resources in
+     * @throws Exception if a Python MCP server fails to initialize
+     */
+    public static void discoverPythonMCPResources(
+            Map<ResourceType, Map<String, ResourceProvider>> resourceProviders,
+            PythonResourceAdapter adapter,
+            ResourceCache cache)
+            throws Exception {
+
+        // Store the adapter on the cache so that future cache.getResource() 
calls on
+        // non-MCP Python resources (e.g. PythonChatModelSetup) will have the 
adapter available.
+        cache.setPythonResourceAdapter(adapter);
+
+        Map<String, ResourceProvider> servers = 
resourceProviders.get(MCP_SERVER);
+        if (servers == null) {
+            return;
+        }
+
+        for (ResourceProvider rp : servers.values()) {
+            if (!(rp instanceof PythonResourceProvider)) {
+                continue;
+            }
+            PythonResourceProvider provider = (PythonResourceProvider) rp;
+            provider.setPythonResourceAdapter(adapter);
+
+            PythonMCPServer server =
+                    (PythonMCPServer)
+                            provider.provide(
+                                    (name, type) -> {
+                                        try {
+                                            return cache.getResource(name, 
type);
+                                        } catch (Exception e) {
+                                            throw new RuntimeException(e);
+                                        }
+                                    });
+
+            for (PythonMCPTool tool : server.listTools()) {
+                cache.put(tool.getName(), TOOL, tool);
+            }
+            for (PythonMCPPrompt prompt : server.listPrompts()) {
+                cache.put(prompt.getName(), PROMPT, prompt);
+            }
+        }
+    }
+}
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/ResourceCache.java 
b/runtime/src/main/java/org/apache/flink/agents/runtime/ResourceCache.java
new file mode 100644
index 00000000..3537a5a7
--- /dev/null
+++ b/runtime/src/main/java/org/apache/flink/agents/runtime/ResourceCache.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.runtime;
+
+import org.apache.flink.agents.api.resource.Resource;
+import org.apache.flink.agents.api.resource.ResourceType;
+import org.apache.flink.agents.api.resource.python.PythonResourceAdapter;
+import org.apache.flink.agents.plan.resourceprovider.PythonResourceProvider;
+import org.apache.flink.agents.plan.resourceprovider.ResourceProvider;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Lazily resolves and caches Resource instances from ResourceProviders.
+ *
+ * <p>Resources are created on first access via their provider's {@code 
provide()} method and cached
+ * for subsequent lookups. Supports recursive dependency resolution — a 
resource can depend on other
+ * resources.
+ *
+ * <p>Thread-safe: resource resolution can happen on async pool threads (e.g. 
when {@code
+ * BaseChatModelSetup.chat()} resolves connection, prompt, and tools inside a 
{@code
+ * durableExecuteAsync} callable).
+ */
+public class ResourceCache implements AutoCloseable {
+
+    private final Map<ResourceType, Map<String, ResourceProvider>> 
resourceProviders;
+    private final Map<ResourceType, Map<String, Resource>> cache = new 
ConcurrentHashMap<>();
+    private volatile PythonResourceAdapter pythonResourceAdapter;
+
+    public ResourceCache(Map<ResourceType, Map<String, ResourceProvider>> 
resourceProviders) {
+        // Defensive copy: the cache must not be affected by later mutations 
to the source map.
+        this.resourceProviders = new HashMap<>();
+        for (Map.Entry<ResourceType, Map<String, ResourceProvider>> entry :
+                resourceProviders.entrySet()) {
+            this.resourceProviders.put(entry.getKey(), new 
HashMap<>(entry.getValue()));
+        }
+    }
+
+    void setPythonResourceAdapter(PythonResourceAdapter adapter) {
+        this.pythonResourceAdapter = adapter;
+    }
+
+    /**
+     * Resolves a resource by name and type, creating it from its provider if 
not cached.
+     *
+     * @param name the resource name
+     * @param type the resource type
+     * @return the resource instance
+     * @throws Exception if the resource cannot be found or created
+     */
+    public synchronized Resource getResource(String name, ResourceType type) 
throws Exception {
+        Map<String, Resource> typed = cache.get(type);
+        if (typed != null) {
+            Resource cached = typed.get(name);
+            if (cached != null) {
+                return cached;
+            }
+        }
+
+        Map<String, ResourceProvider> providers = resourceProviders.get(type);
+        if (providers == null || !providers.containsKey(name)) {
+            throw new IllegalArgumentException("Resource not found: " + name + 
" of type " + type);
+        }
+        ResourceProvider provider = providers.get(name);
+
+        if (pythonResourceAdapter != null && provider instanceof 
PythonResourceProvider) {
+            ((PythonResourceProvider) 
provider).setPythonResourceAdapter(pythonResourceAdapter);
+        }
+
+        Resource resource =
+                provider.provide(
+                        (anotherName, anotherType) -> {
+                            try {
+                                return this.getResource(anotherName, 
anotherType);
+                            } catch (Exception e) {
+                                throw new RuntimeException(e);
+                            }
+                        });
+
+        cache.computeIfAbsent(type, k -> new ConcurrentHashMap<>()).put(name, 
resource);
+        return resource;
+    }
+
+    /**
+     * Puts a resource directly into the cache.
+     *
+     * @param name the resource name
+     * @param type the resource type
+     * @param resource the resource instance
+     */
+    public void put(String name, ResourceType type, Resource resource) {
+        cache.computeIfAbsent(type, k -> new ConcurrentHashMap<>()).put(name, 
resource);
+    }
+
+    @Override
+    public void close() throws Exception {
+        Exception firstException = null;
+        for (Map<String, Resource> resources : cache.values()) {
+            for (Resource resource : resources.values()) {
+                try {
+                    resource.close();
+                } catch (Exception e) {
+                    if (firstException == null) {
+                        firstException = e;
+                    } else {
+                        firstException.addSuppressed(e);
+                    }
+                }
+            }
+        }
+        cache.clear();
+        if (firstException != null) {
+            throw firstException;
+        }
+    }
+}
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/context/JavaRunnerContextImpl.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/context/JavaRunnerContextImpl.java
index a18bb30b..ce5f3f11 100644
--- 
a/runtime/src/main/java/org/apache/flink/agents/runtime/context/JavaRunnerContextImpl.java
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/context/JavaRunnerContextImpl.java
@@ -19,6 +19,7 @@ package org.apache.flink.agents.runtime.context;
 
 import org.apache.flink.agents.api.context.DurableCallable;
 import org.apache.flink.agents.plan.AgentPlan;
+import org.apache.flink.agents.runtime.ResourceCache;
 import org.apache.flink.agents.runtime.async.ContinuationActionExecutor;
 import org.apache.flink.agents.runtime.async.ContinuationContext;
 import org.apache.flink.agents.runtime.metrics.FlinkAgentsMetricGroupImpl;
@@ -38,9 +39,10 @@ public class JavaRunnerContextImpl extends RunnerContextImpl 
{
             FlinkAgentsMetricGroupImpl agentMetricGroup,
             Runnable mailboxThreadChecker,
             AgentPlan agentPlan,
+            ResourceCache resourceCache,
             String jobIdentifier,
             ContinuationActionExecutor continuationExecutor) {
-        super(agentMetricGroup, mailboxThreadChecker, agentPlan, 
jobIdentifier);
+        super(agentMetricGroup, mailboxThreadChecker, agentPlan, 
resourceCache, jobIdentifier);
         this.continuationExecutor = continuationExecutor;
     }
 
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/context/RunnerContextImpl.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/context/RunnerContextImpl.java
index cab790dc..8c7b9fc3 100644
--- 
a/runtime/src/main/java/org/apache/flink/agents/runtime/context/RunnerContextImpl.java
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/context/RunnerContextImpl.java
@@ -33,6 +33,7 @@ import org.apache.flink.agents.api.resource.ResourceType;
 import org.apache.flink.agents.plan.AgentPlan;
 import org.apache.flink.agents.plan.actions.Action;
 import org.apache.flink.agents.plan.utils.JsonUtils;
+import org.apache.flink.agents.runtime.ResourceCache;
 import org.apache.flink.agents.runtime.actionstate.ActionState;
 import org.apache.flink.agents.runtime.actionstate.CallResult;
 import org.apache.flink.agents.runtime.memory.CachedMemoryStore;
@@ -98,6 +99,7 @@ public class RunnerContextImpl implements RunnerContext {
     protected final FlinkAgentsMetricGroupImpl agentMetricGroup;
     protected final Runnable mailboxThreadChecker;
     protected final AgentPlan agentPlan;
+    protected final ResourceCache resourceCache;
 
     protected MemoryContext memoryContext;
     protected String actionName;
@@ -110,10 +112,12 @@ public class RunnerContextImpl implements RunnerContext {
             FlinkAgentsMetricGroupImpl agentMetricGroup,
             Runnable mailboxThreadChecker,
             AgentPlan agentPlan,
+            ResourceCache resourceCache,
             String jobIdentifier) {
         this.agentMetricGroup = agentMetricGroup;
         this.mailboxThreadChecker = mailboxThreadChecker;
         this.agentPlan = agentPlan;
+        this.resourceCache = resourceCache;
 
         LongTermMemoryOptions.LongTermMemoryBackend backend =
                 this.getConfig().get(LongTermMemoryOptions.BACKEND);
@@ -221,10 +225,10 @@ public class RunnerContextImpl implements RunnerContext {
 
     @Override
     public Resource getResource(String name, ResourceType type) throws 
Exception {
-        if (agentPlan == null) {
-            throw new IllegalStateException("AgentPlan is not available in 
this context");
+        if (resourceCache == null) {
+            throw new IllegalStateException("ResourceCache is not available in 
this context");
         }
-        Resource resource = agentPlan.getResource(name, type);
+        Resource resource = resourceCache.getResource(name, type);
         // Set current action's metric group to the resource
         resource.setMetricGroup(getActionMetricGroup());
         return resource;
@@ -394,8 +398,6 @@ public class RunnerContextImpl implements RunnerContext {
             this.ltm.close();
             this.ltm = null;
         }
-
-        this.agentPlan.close();
     }
 
     public String getActionName() {
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
index 89e35131..e5015a3a 100644
--- 
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
@@ -35,6 +35,8 @@ import org.apache.flink.agents.plan.JavaFunction;
 import org.apache.flink.agents.plan.PythonFunction;
 import org.apache.flink.agents.plan.actions.Action;
 import org.apache.flink.agents.plan.resourceprovider.PythonResourceProvider;
+import org.apache.flink.agents.runtime.PythonMCPResourceDiscovery;
+import org.apache.flink.agents.runtime.ResourceCache;
 import org.apache.flink.agents.runtime.actionstate.ActionState;
 import org.apache.flink.agents.runtime.actionstate.ActionStateStore;
 import org.apache.flink.agents.runtime.actionstate.KafkaActionStateStore;
@@ -131,6 +133,8 @@ public class ActionExecutionOperator<IN, OUT> extends 
AbstractStreamOperator<OUT
 
     private final AgentPlan agentPlan;
 
+    private transient ResourceCache resourceCache;
+
     private final Boolean inputIsJava;
 
     private transient StreamRecord<OUT> reusedStreamRecord;
@@ -266,6 +270,8 @@ public class ActionExecutionOperator<IN, OUT> extends 
AbstractStreamOperator<OUT
                         TypeInformation.of(MemoryObjectImpl.MemoryItem.class));
         shortTermMemState = 
getRuntimeContext().getMapState(shortTermMemStateDescriptor);
 
+        resourceCache = new ResourceCache(agentPlan.getResourceProviders());
+
         metricGroup = new FlinkAgentsMetricGroupImpl(getMetricGroup());
         builtInMetrics = new BuiltInMetrics(metricGroup, agentPlan);
 
@@ -606,7 +612,7 @@ public class ActionExecutionOperator<IN, OUT> extends 
AbstractStreamOperator<OUT
 
     private Resource getResource(String name, ResourceType type) {
         try {
-            return agentPlan.getResource(name, type);
+            return resourceCache.getResource(name, type);
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
@@ -651,6 +657,7 @@ public class ActionExecutionOperator<IN, OUT> extends 
AbstractStreamOperator<OUT
                             this.metricGroup,
                             this::checkMailboxThread,
                             this.agentPlan,
+                            this.resourceCache,
                             this.jobIdentifier);
 
             javaResourceAdapter = new JavaResourceAdapter(this::getResource, 
pythonInterpreter);
@@ -679,7 +686,7 @@ public class ActionExecutionOperator<IN, OUT> extends 
AbstractStreamOperator<OUT
                 new PythonResourceAdapterImpl(
                         (String anotherName, ResourceType anotherType) -> {
                             try {
-                                return agentPlan.getResource(anotherName, 
anotherType);
+                                return resourceCache.getResource(anotherName, 
anotherType);
                             } catch (Exception e) {
                                 throw new RuntimeException(e);
                             }
@@ -687,7 +694,8 @@ public class ActionExecutionOperator<IN, OUT> extends 
AbstractStreamOperator<OUT
                         pythonInterpreter,
                         javaResourceAdapter);
         pythonResourceAdapter.open();
-        agentPlan.setPythonResourceAdapter(pythonResourceAdapter);
+        PythonMCPResourceDiscovery.discoverPythonMCPResources(
+                agentPlan.getResourceProviders(), pythonResourceAdapter, 
resourceCache);
     }
 
     @Override
@@ -704,6 +712,10 @@ public class ActionExecutionOperator<IN, OUT> extends 
AbstractStreamOperator<OUT
 
     @Override
     public void close() throws Exception {
+        // Must close before pythonInterpreter since cached resources may hold 
Python references.
+        if (resourceCache != null) {
+            resourceCache.close();
+        }
         if (runnerContext != null) {
             try {
                 runnerContext.close();
@@ -726,9 +738,6 @@ public class ActionExecutionOperator<IN, OUT> extends 
AbstractStreamOperator<OUT
         if (actionStateStore != null) {
             actionStateStore.close();
         }
-        if (runnerContext != null) {
-            runnerContext.close();
-        }
         if (continuationActionExecutor != null) {
             continuationActionExecutor.close();
         }
@@ -1096,6 +1105,7 @@ public class ActionExecutionOperator<IN, OUT> extends 
AbstractStreamOperator<OUT
                                 this.metricGroup,
                                 this::checkMailboxThread,
                                 this.agentPlan,
+                                this.resourceCache,
                                 this.jobIdentifier,
                                 continuationActionExecutor);
             }
@@ -1107,6 +1117,7 @@ public class ActionExecutionOperator<IN, OUT> extends 
AbstractStreamOperator<OUT
                                 this.metricGroup,
                                 this::checkMailboxThread,
                                 this.agentPlan,
+                                this.resourceCache,
                                 jobIdentifier);
             }
             return pythonRunnerContext;
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/context/PythonRunnerContextImpl.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/context/PythonRunnerContextImpl.java
index 7bfea4ad..e9741baf 100644
--- 
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/context/PythonRunnerContextImpl.java
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/context/PythonRunnerContextImpl.java
@@ -21,6 +21,7 @@ package org.apache.flink.agents.runtime.python.context;
 import org.apache.flink.agents.api.Event;
 import org.apache.flink.agents.api.context.RunnerContext;
 import org.apache.flink.agents.plan.AgentPlan;
+import org.apache.flink.agents.runtime.ResourceCache;
 import org.apache.flink.agents.runtime.context.RunnerContextImpl;
 import org.apache.flink.agents.runtime.metrics.FlinkAgentsMetricGroupImpl;
 import org.apache.flink.agents.runtime.python.event.PythonEvent;
@@ -42,8 +43,9 @@ public class PythonRunnerContextImpl extends 
RunnerContextImpl {
             FlinkAgentsMetricGroupImpl agentMetricGroup,
             Runnable mailboxThreadChecker,
             AgentPlan agentPlan,
+            ResourceCache resourceCache,
             String jobIdentifier) {
-        super(agentMetricGroup, mailboxThreadChecker, agentPlan, 
jobIdentifier);
+        super(agentMetricGroup, mailboxThreadChecker, agentPlan, 
resourceCache, jobIdentifier);
     }
 
     @Override
diff --git 
a/runtime/src/test/java/org/apache/flink/agents/runtime/ResourceCacheTest.java 
b/runtime/src/test/java/org/apache/flink/agents/runtime/ResourceCacheTest.java
new file mode 100644
index 00000000..162c001f
--- /dev/null
+++ 
b/runtime/src/test/java/org/apache/flink/agents/runtime/ResourceCacheTest.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.runtime;
+
+import org.apache.flink.agents.api.InputEvent;
+import org.apache.flink.agents.api.agents.Agent;
+import org.apache.flink.agents.api.annotation.ChatModelSetup;
+import org.apache.flink.agents.api.annotation.Tool;
+import org.apache.flink.agents.api.chat.messages.ChatMessage;
+import org.apache.flink.agents.api.chat.model.python.PythonChatModelSetup;
+import org.apache.flink.agents.api.context.RunnerContext;
+import org.apache.flink.agents.api.resource.Resource;
+import org.apache.flink.agents.api.resource.ResourceDescriptor;
+import org.apache.flink.agents.api.resource.ResourceType;
+import org.apache.flink.agents.api.resource.SerializableResource;
+import org.apache.flink.agents.api.resource.python.PythonResourceAdapter;
+import org.apache.flink.agents.api.resource.python.PythonResourceWrapper;
+import 
org.apache.flink.agents.api.vectorstores.CollectionManageableVectorStore;
+import org.apache.flink.agents.api.vectorstores.Document;
+import org.apache.flink.agents.api.vectorstores.VectorStoreQuery;
+import org.apache.flink.agents.api.vectorstores.VectorStoreQueryResult;
+import org.apache.flink.agents.plan.AgentPlan;
+import org.junit.jupiter.api.Test;
+import pemja.core.object.PyObject;
+
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiFunction;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link ResourceCache}. */
+public class ResourceCacheTest {
+
+    /** Test tool resource class. */
+    public static class TestTool extends SerializableResource {
+        private final String name;
+
+        public TestTool(String name) {
+            this.name = name;
+        }
+
+        public String getName() {
+            return name;
+        }
+
+        @Override
+        public ResourceType getResourceType() {
+            return ResourceType.TOOL;
+        }
+    }
+
+    /** Test serializable chat model resource class. */
+    public static class TestSerializableChatModel extends SerializableResource 
{
+        private final String name;
+
+        public TestSerializableChatModel(String name) {
+            this.name = name;
+        }
+
+        public String getName() {
+            return name;
+        }
+
+        @Override
+        public ResourceType getResourceType() {
+            return ResourceType.CHAT_MODEL;
+        }
+    }
+
+    public static class TestPythonResource extends Resource implements 
PythonResourceWrapper {
+
+        public TestPythonResource(
+                PythonResourceAdapter adapter,
+                PyObject chatModel,
+                ResourceDescriptor descriptor,
+                BiFunction<String, ResourceType, Resource> getResource) {
+            super(descriptor, getResource);
+        }
+
+        @Override
+        public ResourceType getResourceType() {
+            return ResourceType.CHAT_MODEL;
+        }
+
+        @Override
+        public Object getPythonResource() {
+            return null;
+        }
+    }
+
+    /** Test agent class with resource annotations. */
+    public static class TestAgentWithResources extends Agent {
+
+        @Tool private TestTool myTool = new TestTool("myTool");
+
+        @ChatModelSetup
+        private TestSerializableChatModel chatModel =
+                new TestSerializableChatModel("defaultChatModel");
+
+        @ChatModelSetup
+        public static ResourceDescriptor pythonChatModel() {
+            return 
ResourceDescriptor.Builder.newBuilder(TestPythonResource.class.getName())
+                    .addInitialArgument("pythonClazz", "test.module.TestClazz")
+                    .build();
+        }
+
+        @Tool private TestTool anotherTool = new TestTool("anotherTool");
+
+        @org.apache.flink.agents.api.annotation.Action(listenEvents = 
{InputEvent.class})
+        public void handleInputEvent(InputEvent event, RunnerContext context) 
{}
+    }
+
+    public static class TestPythonResourceAdapter implements 
PythonResourceAdapter {
+
+        @Override
+        public Object getResource(String resourceName, String resourceType) {
+            return null;
+        }
+
+        @Override
+        public PyObject initPythonResource(
+                String module, String clazz, Map<String, Object> kwargs) {
+            return null;
+        }
+
+        @Override
+        public Object toPythonChatMessage(ChatMessage message) {
+            return null;
+        }
+
+        @Override
+        public ChatMessage fromPythonChatMessage(Object pythonChatMessage) {
+            return null;
+        }
+
+        @Override
+        public Object toPythonDocuments(List<Document> documents) {
+            return null;
+        }
+
+        @Override
+        public List<Document> fromPythonDocuments(List<PyObject> 
pythonDocuments) {
+            return List.of();
+        }
+
+        @Override
+        public Object toPythonVectorStoreQuery(VectorStoreQuery query) {
+            return null;
+        }
+
+        @Override
+        public VectorStoreQueryResult fromPythonVectorStoreQueryResult(
+                PyObject pythonVectorStoreQueryResult) {
+            return null;
+        }
+
+        @Override
+        public CollectionManageableVectorStore.Collection fromPythonCollection(
+                PyObject pythonCollection) {
+            return null;
+        }
+
+        @Override
+        public Object 
convertToPythonTool(org.apache.flink.agents.api.tools.Tool tool) {
+            return null;
+        }
+
+        @Override
+        public Object callMethod(Object obj, String methodName, Map<String, 
Object> kwargs) {
+            return null;
+        }
+
+        @Override
+        public Object invoke(String name, Object... args) {
+            return null;
+        }
+    }
+
+    @Test
+    public void testGetResourceNotFound() throws Exception {
+        Agent agent = new Agent();
+        AgentPlan agentPlan = new AgentPlan(agent);
+        ResourceCache cache = new 
ResourceCache(agentPlan.getResourceProviders());
+
+        assertThatThrownBy(() -> cache.getResource("non-existent", 
ResourceType.CHAT_MODEL))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining("Resource not found: non-existent");
+    }
+
+    @Test
+    public void testGetResourceFromResourceProvider() throws Exception {
+        TestAgentWithResources agent = new TestAgentWithResources();
+        AgentPlan agentPlan = new AgentPlan(agent);
+        ResourceCache cache = new 
ResourceCache(agentPlan.getResourceProviders());
+
+        // Test getting a tool resource
+        Resource myTool = cache.getResource("myTool", ResourceType.TOOL);
+        assertThat(myTool).isNotNull();
+        assertThat(myTool).isInstanceOf(TestTool.class);
+        assertThat(myTool.getResourceType()).isEqualTo(ResourceType.TOOL);
+
+        // Test getting a chat model resource
+        Resource chatModel = cache.getResource("chatModel", 
ResourceType.CHAT_MODEL);
+        assertThat(chatModel).isNotNull();
+        assertThat(chatModel).isInstanceOf(TestSerializableChatModel.class);
+        
assertThat(chatModel.getResourceType()).isEqualTo(ResourceType.CHAT_MODEL);
+
+        assertThatThrownBy(() -> cache.getResource("pythonChatModel", 
ResourceType.CHAT_MODEL))
+                .isInstanceOf(IllegalStateException.class)
+                .hasMessageContaining("PythonResourceAdapter is not set");
+
+        PythonMCPResourceDiscovery.discoverPythonMCPResources(
+                agentPlan.getResourceProviders(), new 
TestPythonResourceAdapter(), cache);
+        Resource pythonChatModel = cache.getResource("pythonChatModel", 
ResourceType.CHAT_MODEL);
+        assertThat(pythonChatModel).isNotNull();
+        assertThat(pythonChatModel).isInstanceOf(PythonChatModelSetup.class);
+        
assertThat(pythonChatModel.getResourceType()).isEqualTo(ResourceType.CHAT_MODEL);
+
+        // Test that resources are cached (should be the same instance)
+        Resource myToolAgain = cache.getResource("myTool", ResourceType.TOOL);
+        assertThat(myTool).isSameAs(myToolAgain);
+    }
+}

Reply via email to