This is an automated email from the ASF dual-hosted git repository.
xtsong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-agents.git
The following commit(s) were added to refs/heads/main by this push:
new f8eac10b [plan] Extract ResourceCache and PythonResourceBridge from
AgentPlan (#548)
f8eac10b is described below
commit f8eac10b870ad62ab1799fe9d7d35cc25d40185c
Author: Weiqing Yang <[email protected]>
AuthorDate: Thu Apr 2 03:02:16 2026 -0700
[plan] Extract ResourceCache and PythonResourceBridge from AgentPlan (#548)
---
.../org/apache/flink/agents/plan/AgentPlan.java | 119 ----------
.../agents/plan/AgentPlanDeclareChatModelTest.java | 38 +++-
.../agents/plan/AgentPlanDeclareMCPServerTest.java | 19 +-
.../agents/plan/AgentPlanDeclareToolFieldTest.java | 21 +-
.../plan/AgentPlanDeclareToolMethodTest.java | 55 ++++-
.../apache/flink/agents/plan/AgentPlanTest.java | 121 -----------
.../flink/agents/plan/FunctionToolPlanTest.java | 22 +-
python/flink_agents/plan/agent_plan.py | 44 +---
python/flink_agents/plan/tests/test_agent_plan.py | 4 +-
.../flink_agents/runtime/flink_runner_context.py | 14 +-
python/flink_agents/runtime/local_runner.py | 12 +-
python/flink_agents/runtime/resource_cache.py | 90 ++++++++
.../agents/runtime/PythonMCPResourceDiscovery.java | 92 ++++++++
.../apache/flink/agents/runtime/ResourceCache.java | 134 ++++++++++++
.../runtime/context/JavaRunnerContextImpl.java | 4 +-
.../agents/runtime/context/RunnerContextImpl.java | 12 +-
.../runtime/operator/ActionExecutionOperator.java | 23 +-
.../python/context/PythonRunnerContextImpl.java | 4 +-
.../flink/agents/runtime/ResourceCacheTest.java | 241 +++++++++++++++++++++
19 files changed, 744 insertions(+), 325 deletions(-)
diff --git a/plan/src/main/java/org/apache/flink/agents/plan/AgentPlan.java
b/plan/src/main/java/org/apache/flink/agents/plan/AgentPlan.java
index 0641e819..054b2ed2 100644
--- a/plan/src/main/java/org/apache/flink/agents/plan/AgentPlan.java
+++ b/plan/src/main/java/org/apache/flink/agents/plan/AgentPlan.java
@@ -28,7 +28,6 @@ import org.apache.flink.agents.api.resource.Resource;
import org.apache.flink.agents.api.resource.ResourceDescriptor;
import org.apache.flink.agents.api.resource.ResourceType;
import org.apache.flink.agents.api.resource.SerializableResource;
-import org.apache.flink.agents.api.resource.python.PythonResourceAdapter;
import org.apache.flink.agents.api.resource.python.PythonResourceWrapper;
import org.apache.flink.agents.api.tools.ToolMetadata;
import org.apache.flink.agents.plan.actions.Action;
@@ -60,7 +59,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import static org.apache.flink.agents.api.resource.ResourceType.MCP_SERVER;
@@ -86,17 +84,11 @@ public class AgentPlan implements Serializable {
private AgentConfiguration config;
- private transient PythonResourceAdapter pythonResourceAdapter;
-
- /** Cache for instantiated resources. */
- private transient Map<ResourceType, Map<String, Resource>> resourceCache;
-
public AgentPlan(Map<String, Action> actions, Map<String, List<Action>>
actionsByEvent) {
this.actions = actions;
this.actionsByEvent = actionsByEvent;
this.resourceProviders = new HashMap<>();
this.config = new AgentConfiguration();
- this.resourceCache = new ConcurrentHashMap<>();
}
public AgentPlan(
@@ -106,7 +98,6 @@ public class AgentPlan implements Serializable {
this.actions = actions;
this.actionsByEvent = actionsByEvent;
this.resourceProviders = resourceProviders;
- this.resourceCache = new ConcurrentHashMap<>();
this.config = new AgentConfiguration();
}
@@ -118,7 +109,6 @@ public class AgentPlan implements Serializable {
this.actions = actions;
this.actionsByEvent = actionsByEvent;
this.resourceProviders = resourceProviders;
- this.resourceCache = new ConcurrentHashMap<>();
this.config = config;
}
@@ -140,62 +130,6 @@ public class AgentPlan implements Serializable {
this.config = config;
}
- public void setPythonResourceAdapter(PythonResourceAdapter adapter) throws
Exception {
- this.pythonResourceAdapter = adapter;
- Map<String, ResourceProvider> servers =
resourceProviders.get(MCP_SERVER);
- if (servers == null) {
- return;
- }
- servers.values().stream()
- .filter(PythonResourceProvider.class::isInstance)
- .map(PythonResourceProvider.class::cast)
- .forEach(
- provider -> {
- provider.setPythonResourceAdapter(adapter);
-
- // Get tools and prompts from server
- try {
- PythonMCPServer server =
- (PythonMCPServer)
- provider.provide(
- (String anotherName,
- ResourceType
anotherType) -> {
- try {
- return
this.getResource(
-
anotherName, anotherType);
- } catch (Exception
e) {
- throw new
RuntimeException(e);
- }
- });
-
- // Add tools to cache
- server.listTools()
- .forEach(
- tool ->
- resourceCache
-
.computeIfAbsent(
- TOOL,
- k ->
-
new ConcurrentHashMap<>())
-
.put(tool.getName(), tool));
-
- // Add prompts to cache
- server.listPrompts()
- .forEach(
- prompt ->
- resourceCache
-
.computeIfAbsent(
- PROMPT,
- k ->
-
new ConcurrentHashMap<>())
-
.put(prompt.getName(), prompt));
- } catch (Exception e) {
- throw new RuntimeException(
- "Failed to process Python MCP server
in Java", e);
- }
- });
- }
-
public Map<String, Action> getActions() {
return actions;
}
@@ -220,49 +154,6 @@ public class AgentPlan implements Serializable {
return actionsByEvent.get(eventType);
}
- /**
- * Get resource from agent plan.
- *
- * @param name the resource name
- * @param type the resource type
- * @return the resource instance
- * @throws Exception if the resource cannot be found or created
- */
- public Resource getResource(String name, ResourceType type) throws
Exception {
- // Check cache first
- if (resourceCache.containsKey(type) &&
resourceCache.get(type).containsKey(name)) {
- return resourceCache.get(type).get(name);
- }
-
- // Get resource provider
- if (!resourceProviders.containsKey(type)
- || !resourceProviders.get(type).containsKey(name)) {
- throw new IllegalArgumentException("Resource not found: " + name +
" of type " + type);
- }
-
- ResourceProvider provider = resourceProviders.get(type).get(name);
-
- if (pythonResourceAdapter != null && provider instanceof
PythonResourceProvider) {
- ((PythonResourceProvider)
provider).setPythonResourceAdapter(pythonResourceAdapter);
- }
-
- // Create resource using provider
- Resource resource =
- provider.provide(
- (String anotherName, ResourceType anotherType) -> {
- try {
- return this.getResource(anotherName,
anotherType);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- });
-
- // Cache the resource
- resourceCache.computeIfAbsent(type, k -> new
ConcurrentHashMap<>()).put(name, resource);
-
- return resource;
- }
-
public AgentConfiguration getConfig() {
return config;
}
@@ -271,15 +162,6 @@ public class AgentPlan implements Serializable {
return config.getConfData();
}
- public void close() throws Exception {
- for (Map<String, Resource> resources : resourceCache.values()) {
- for (Resource resource : resources.values()) {
- resource.close();
- }
- }
- resourceCache.clear();
- }
-
private void writeObject(ObjectOutputStream out) throws IOException {
String serializedStr = new ObjectMapper().writeValueAsString(this);
out.writeUTF(serializedStr);
@@ -292,7 +174,6 @@ public class AgentPlan implements Serializable {
this.actionsByEvent = agentPlan.getActionsByEvent();
this.resourceProviders = agentPlan.getResourceProviders();
this.config = agentPlan.getConfig();
- this.resourceCache = new ConcurrentHashMap<>();
}
private void extractActions(
diff --git
a/plan/src/test/java/org/apache/flink/agents/plan/AgentPlanDeclareChatModelTest.java
b/plan/src/test/java/org/apache/flink/agents/plan/AgentPlanDeclareChatModelTest.java
index a829deff..6f1ecb60 100644
---
a/plan/src/test/java/org/apache/flink/agents/plan/AgentPlanDeclareChatModelTest.java
+++
b/plan/src/test/java/org/apache/flink/agents/plan/AgentPlanDeclareChatModelTest.java
@@ -93,6 +93,18 @@ class AgentPlanDeclareChatModelTest {
agentPlan = new AgentPlan(new ChatAgent());
}
+ /** Resolves a resource directly from its provider. */
+ private Resource resolveResource(String name, ResourceType type) throws
Exception {
+ return agentPlan
+ .getResourceProviders()
+ .get(type)
+ .get(name)
+ .provide(
+ (n, t) -> {
+ throw new UnsupportedOperationException("No
dependencies expected");
+ });
+ }
+
@Test
@DisplayName("Discover @ChatModel in AgentPlan resource providers")
void discoverChatModel() {
@@ -107,8 +119,7 @@ class AgentPlanDeclareChatModelTest {
@DisplayName("Retrieve chat model and invoke chat(Prompt)")
void retrieveAndChat() throws Exception {
BaseChatModelSetup model =
- (BaseChatModelSetup)
- agentPlan.getResource("testChatModel",
ResourceType.CHAT_MODEL);
+ (BaseChatModelSetup) resolveResource("testChatModel",
ResourceType.CHAT_MODEL);
assertNotNull(model);
Prompt prompt = Prompt.fromText("Hello world");
@@ -126,7 +137,15 @@ class AgentPlanDeclareChatModelTest {
AgentPlan restored = mapper.readValue(json, AgentPlan.class);
BaseChatModelSetup model =
- (BaseChatModelSetup) restored.getResource("testChatModel",
ResourceType.CHAT_MODEL);
+ (BaseChatModelSetup)
+ restored.getResourceProviders()
+ .get(ResourceType.CHAT_MODEL)
+ .get("testChatModel")
+ .provide(
+ (n, t) -> {
+ throw new
UnsupportedOperationException(
+ "No dependencies
expected");
+ });
ChatMessage reply =
model.chat(Prompt.fromText("Hi").formatMessages(MessageRole.USER, new
HashMap<>()));
assertEquals("ok:Hi", reply.getContent());
@@ -148,10 +167,17 @@ class AgentPlanDeclareChatModelTest {
AgentPlan actualPlan = new AgentPlan(agent);
BaseChatModelSetup actualChatModel =
(BaseChatModelSetup)
- actualPlan.getResource("testChatModel",
ResourceType.CHAT_MODEL);
+ actualPlan
+ .getResourceProviders()
+ .get(ResourceType.CHAT_MODEL)
+ .get("testChatModel")
+ .provide(
+ (n, t) -> {
+ throw new
UnsupportedOperationException(
+ "No dependencies
expected");
+ });
BaseChatModelSetup expectedChatModel =
- (BaseChatModelSetup)
- agentPlan.getResource("testChatModel",
ResourceType.CHAT_MODEL);
+ (BaseChatModelSetup) resolveResource("testChatModel",
ResourceType.CHAT_MODEL);
Assertions.assertEquals(expectedChatModel.getClass(),
actualChatModel.getClass());
Assertions.assertEquals(expectedChatModel.getConnection(),
actualChatModel.getConnection());
Assertions.assertEquals(expectedChatModel.getModel(),
actualChatModel.getModel());
diff --git
a/plan/src/test/java/org/apache/flink/agents/plan/AgentPlanDeclareMCPServerTest.java
b/plan/src/test/java/org/apache/flink/agents/plan/AgentPlanDeclareMCPServerTest.java
index 09d9e6a7..e3c4fe2e 100644
---
a/plan/src/test/java/org/apache/flink/agents/plan/AgentPlanDeclareMCPServerTest.java
+++
b/plan/src/test/java/org/apache/flink/agents/plan/AgentPlanDeclareMCPServerTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.agents.api.agents.Agent;
import org.apache.flink.agents.api.annotation.Action;
import org.apache.flink.agents.api.context.RunnerContext;
import org.apache.flink.agents.api.prompt.Prompt;
+import org.apache.flink.agents.api.resource.Resource;
import org.apache.flink.agents.api.resource.ResourceDescriptor;
import org.apache.flink.agents.api.resource.ResourceName;
import org.apache.flink.agents.api.resource.ResourceType;
@@ -184,6 +185,18 @@ class AgentPlanDeclareMCPServerTest {
agentPlan = new AgentPlan(new TestMCPAgent());
}
+ /** Resolves a resource directly from its provider. */
+ private Resource resolveResource(String name, ResourceType type) throws
Exception {
+ return agentPlan
+ .getResourceProviders()
+ .get(type)
+ .get(name)
+ .provide(
+ (n, t) -> {
+ throw new UnsupportedOperationException("No
dependencies expected");
+ });
+ }
+
@AfterAll
static void afterAll() {
if (pythonMcpServerProcess != null) {
@@ -237,7 +250,7 @@ class AgentPlanDeclareMCPServerTest {
@DisabledOnJre(JRE.JAVA_11)
@DisplayName("Retrieve MCP tool from AgentPlan - add tool")
void retrieveMCPToolAdd() throws Exception {
- Tool tool = (Tool) agentPlan.getResource("add", ResourceType.TOOL);
+ Tool tool = (Tool) resolveResource("add", ResourceType.TOOL);
assertNotNull(tool);
assertInstanceOf(MCPTool.class, tool);
@@ -259,7 +272,7 @@ class AgentPlanDeclareMCPServerTest {
@DisabledOnJre(JRE.JAVA_11)
@DisplayName("Retrieve MCP prompt from AgentPlan - ask_sum")
void retrieveMCPPromptAskSum() throws Exception {
- Prompt prompt = (Prompt) agentPlan.getResource("ask_sum",
ResourceType.PROMPT);
+ Prompt prompt = (Prompt) resolveResource("ask_sum",
ResourceType.PROMPT);
assertNotNull(prompt);
assertInstanceOf(MCPPrompt.class, prompt);
@@ -305,7 +318,7 @@ class AgentPlanDeclareMCPServerTest {
@DisabledOnJre(JRE.JAVA_11)
@DisplayName("Test metadata from MCP tool - add")
void testMCPToolMetadata() throws Exception {
- Tool tool = (Tool) agentPlan.getResource("add", ResourceType.TOOL);
+ Tool tool = (Tool) resolveResource("add", ResourceType.TOOL);
ToolMetadata metadata = tool.getMetadata();
assertEquals("add", metadata.getName());
diff --git
a/plan/src/test/java/org/apache/flink/agents/plan/AgentPlanDeclareToolFieldTest.java
b/plan/src/test/java/org/apache/flink/agents/plan/AgentPlanDeclareToolFieldTest.java
index 7a96b5db..b45bdde7 100644
---
a/plan/src/test/java/org/apache/flink/agents/plan/AgentPlanDeclareToolFieldTest.java
+++
b/plan/src/test/java/org/apache/flink/agents/plan/AgentPlanDeclareToolFieldTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.agents.api.agents.Agent;
import org.apache.flink.agents.api.annotation.Action;
import org.apache.flink.agents.api.annotation.ToolParam;
import org.apache.flink.agents.api.context.RunnerContext;
+import org.apache.flink.agents.api.resource.Resource;
import org.apache.flink.agents.api.resource.ResourceType;
import org.apache.flink.agents.api.tools.Tool;
import org.apache.flink.agents.api.tools.ToolMetadata;
@@ -117,6 +118,18 @@ class AgentPlanDeclareToolFieldTest {
agentPlan = new AgentPlan(new TestAgent());
}
+ /** Resolves a resource directly from its provider, bypassing
ResourceCache. */
+ private Resource resolveResource(String name, ResourceType type) throws
Exception {
+ return agentPlan
+ .getResourceProviders()
+ .get(type)
+ .get(name)
+ .provide(
+ (n, t) -> {
+ throw new UnsupportedOperationException("No
dependencies expected");
+ });
+ }
+
@Test
@DisplayName("Extract FunctionTool resources into providers")
void extractTools() {
@@ -131,7 +144,7 @@ class AgentPlanDeclareToolFieldTest {
@Test
@DisplayName("Retrieve FunctionTool and call with parameters")
void callCalculator() throws Exception {
- Tool tool = (Tool) agentPlan.getResource("calculator",
ResourceType.TOOL);
+ Tool tool = (Tool) resolveResource("calculator", ResourceType.TOOL);
assertInstanceOf(FunctionTool.class, tool);
ToolResponse r =
tool.call(
@@ -148,7 +161,7 @@ class AgentPlanDeclareToolFieldTest {
@Test
@DisplayName("Call weather FunctionTool")
void callWeather() throws Exception {
- Tool tool = (Tool) agentPlan.getResource("weather", ResourceType.TOOL);
+ Tool tool = (Tool) resolveResource("weather", ResourceType.TOOL);
assertInstanceOf(FunctionTool.class, tool);
ToolResponse r =
tool.call(
@@ -165,7 +178,7 @@ class AgentPlanDeclareToolFieldTest {
@Test
@DisplayName("FunctionTool metadata and schema")
void metadataSchema() throws Exception {
- FunctionTool tool = (FunctionTool) agentPlan.getResource("calculator",
ResourceType.TOOL);
+ FunctionTool tool = (FunctionTool) resolveResource("calculator",
ResourceType.TOOL);
ToolMetadata md = tool.getMetadata();
assertEquals("calculate", md.getName());
assertEquals("Performs basic arithmetic operations",
md.getDescription());
@@ -179,7 +192,7 @@ class AgentPlanDeclareToolFieldTest {
@Test
@DisplayName("FunctionTool error cases")
void calculatorErrors() throws Exception {
- Tool tool = (Tool) agentPlan.getResource("calculator",
ResourceType.TOOL);
+ Tool tool = (Tool) resolveResource("calculator", ResourceType.TOOL);
ToolResponse r =
tool.call(
new ToolParameters(
diff --git
a/plan/src/test/java/org/apache/flink/agents/plan/AgentPlanDeclareToolMethodTest.java
b/plan/src/test/java/org/apache/flink/agents/plan/AgentPlanDeclareToolMethodTest.java
index cbe88b2e..18fc3072 100644
---
a/plan/src/test/java/org/apache/flink/agents/plan/AgentPlanDeclareToolMethodTest.java
+++
b/plan/src/test/java/org/apache/flink/agents/plan/AgentPlanDeclareToolMethodTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.agents.api.agents.Agent;
import org.apache.flink.agents.api.annotation.Action;
import org.apache.flink.agents.api.annotation.ToolParam;
import org.apache.flink.agents.api.context.RunnerContext;
+import org.apache.flink.agents.api.resource.Resource;
import org.apache.flink.agents.api.resource.ResourceType;
import org.apache.flink.agents.api.tools.Tool;
import org.apache.flink.agents.api.tools.ToolMetadata;
@@ -93,6 +94,18 @@ class AgentPlanDeclareToolMethodTest {
agentPlan = new AgentPlan(new TestAgent());
}
+ /** Resolves a resource directly from its provider, bypassing
ResourceCache. */
+ private Resource resolveResource(String name, ResourceType type) throws
Exception {
+ return agentPlan
+ .getResourceProviders()
+ .get(type)
+ .get(name)
+ .provide(
+ (n, t) -> {
+ throw new UnsupportedOperationException("No
dependencies expected");
+ });
+ }
+
@Test
@DisplayName("Discover static @Tool methods and register providers")
void discoverTools() {
@@ -104,8 +117,17 @@ class AgentPlanDeclareToolMethodTest {
assertTrue(toolProviders.containsKey("getWeather"));
}
- void checkToolCall(AgentPlan agentPlan) throws Exception {
- Tool calculator = (Tool) agentPlan.getResource("calculate",
ResourceType.TOOL);
+ void checkToolCall(AgentPlan plan) throws Exception {
+ Tool calculator =
+ (Tool)
+ plan.getResourceProviders()
+ .get(ResourceType.TOOL)
+ .get("calculate")
+ .provide(
+ (n, t) -> {
+ throw new
UnsupportedOperationException(
+ "No dependencies
expected");
+ });
ToolParameters tp =
new ToolParameters(
new HashMap<>(
@@ -117,7 +139,16 @@ class AgentPlanDeclareToolMethodTest {
assertTrue(r.isSuccess());
assertEquals(45.0, (Double) r.getResult(), 0.001);
- Tool weather = (Tool) agentPlan.getResource("getWeather",
ResourceType.TOOL);
+ Tool weather =
+ (Tool)
+ plan.getResourceProviders()
+ .get(ResourceType.TOOL)
+ .get("getWeather")
+ .provide(
+ (n, t) -> {
+ throw new
UnsupportedOperationException(
+ "No dependencies
expected");
+ });
ToolResponse wr =
weather.call(
new ToolParameters(
@@ -152,13 +183,14 @@ class AgentPlanDeclareToolMethodTest {
Tool.fromMethod(
TestAgent.class.getMethod(
"getWeather", String.class,
String.class)));
- checkToolCall(new AgentPlan(agent));
+ AgentPlan addedPlan = new AgentPlan(agent);
+ checkToolCall(addedPlan);
}
@Test
@DisplayName("Parameter conversion and errors")
void paramConversionAndErrors() throws Exception {
- Tool calculator = (Tool) agentPlan.getResource("calculate",
ResourceType.TOOL);
+ Tool calculator = (Tool) resolveResource("calculate",
ResourceType.TOOL);
ToolResponse r =
calculator.call(
@@ -213,7 +245,7 @@ class AgentPlanDeclareToolMethodTest {
@Test
@DisplayName("Metadata and schema shape")
void metadataSchema() throws Exception {
- Tool calculator = (Tool) agentPlan.getResource("calculate",
ResourceType.TOOL);
+ Tool calculator = (Tool) resolveResource("calculate",
ResourceType.TOOL);
ToolMetadata md = calculator.getMetadata();
assertEquals("calculate", md.getName());
assertEquals("Performs basic arithmetic operations",
md.getDescription());
@@ -230,7 +262,16 @@ class AgentPlanDeclareToolMethodTest {
ObjectMapper mapper = new ObjectMapper();
String json = mapper.writeValueAsString(agentPlan);
AgentPlan restored = mapper.readValue(json, AgentPlan.class);
- Tool calculator = (Tool) restored.getResource("calculate",
ResourceType.TOOL);
+ Tool calculator =
+ (Tool)
+ restored.getResourceProviders()
+ .get(ResourceType.TOOL)
+ .get("calculate")
+ .provide(
+ (n, t) -> {
+ throw new
UnsupportedOperationException(
+ "No dependencies
expected");
+ });
ToolResponse r =
calculator.call(
new ToolParameters(
diff --git a/plan/src/test/java/org/apache/flink/agents/plan/AgentPlanTest.java
b/plan/src/test/java/org/apache/flink/agents/plan/AgentPlanTest.java
index ad513248..71896903 100644
--- a/plan/src/test/java/org/apache/flink/agents/plan/AgentPlanTest.java
+++ b/plan/src/test/java/org/apache/flink/agents/plan/AgentPlanTest.java
@@ -24,8 +24,6 @@ import org.apache.flink.agents.api.OutputEvent;
import org.apache.flink.agents.api.agents.Agent;
import org.apache.flink.agents.api.annotation.ChatModelSetup;
import org.apache.flink.agents.api.annotation.Tool;
-import org.apache.flink.agents.api.chat.messages.ChatMessage;
-import org.apache.flink.agents.api.chat.model.python.PythonChatModelSetup;
import org.apache.flink.agents.api.context.RunnerContext;
import org.apache.flink.agents.api.resource.Resource;
import org.apache.flink.agents.api.resource.ResourceDescriptor;
@@ -33,10 +31,6 @@ import org.apache.flink.agents.api.resource.ResourceType;
import org.apache.flink.agents.api.resource.SerializableResource;
import org.apache.flink.agents.api.resource.python.PythonResourceAdapter;
import org.apache.flink.agents.api.resource.python.PythonResourceWrapper;
-import
org.apache.flink.agents.api.vectorstores.CollectionManageableVectorStore;
-import org.apache.flink.agents.api.vectorstores.Document;
-import org.apache.flink.agents.api.vectorstores.VectorStoreQuery;
-import org.apache.flink.agents.api.vectorstores.VectorStoreQueryResult;
import org.apache.flink.agents.plan.actions.Action;
import
org.apache.flink.agents.plan.resourceprovider.JavaSerializableResourceProvider;
import org.apache.flink.agents.plan.resourceprovider.PythonResourceProvider;
@@ -50,7 +44,6 @@ import java.util.Map;
import java.util.function.BiFunction;
import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Test for {@link AgentPlan} constructor that takes an Agent. */
public class AgentPlanTest {
@@ -178,72 +171,6 @@ public class AgentPlanTest {
}
}
- public static class TestPythonResourceAdapter implements
PythonResourceAdapter {
-
- @Override
- public Object getResource(String resourceName, String resourceType) {
- return null;
- }
-
- @Override
- public PyObject initPythonResource(
- String module, String clazz, Map<String, Object> kwargs) {
- return null;
- }
-
- @Override
- public Object toPythonChatMessage(ChatMessage message) {
- return null;
- }
-
- @Override
- public ChatMessage fromPythonChatMessage(Object pythonChatMessage) {
- return null;
- }
-
- @Override
- public Object toPythonDocuments(List<Document> documents) {
- return null;
- }
-
- @Override
- public List<Document> fromPythonDocuments(List<PyObject>
pythonDocuments) {
- return List.of();
- }
-
- @Override
- public Object toPythonVectorStoreQuery(VectorStoreQuery query) {
- return null;
- }
-
- @Override
- public VectorStoreQueryResult fromPythonVectorStoreQueryResult(
- PyObject pythonVectorStoreQueryResult) {
- return null;
- }
-
- @Override
- public CollectionManageableVectorStore.Collection fromPythonCollection(
- PyObject pythonCollection) {
- return null;
- }
-
- @Override
- public Object
convertToPythonTool(org.apache.flink.agents.api.tools.Tool tool) {
- return null;
- }
-
- @Override
- public Object callMethod(Object obj, String methodName, Map<String,
Object> kwargs) {
- return null;
- }
-
- @Override
- public Object invoke(String name, Object... args) {
- return null;
- }
- }
-
@Test
public void testConstructorWithAgent() throws Exception {
// Create an agent instance
@@ -371,20 +298,6 @@ public class AgentPlanTest {
123, actualPlan.getActionConfigValue("handleMultipleEvents",
"key"));
}
- @Test
- public void testGetResourceNotFound() throws Exception {
- TestAgent agent = new TestAgent();
- AgentPlan agentPlan = new AgentPlan(agent);
-
- // Test getting non-existent resource throws exception
- try {
- agentPlan.getResource("non-existent", ResourceType.CHAT_MODEL);
- assertThat(false).as("Should have thrown
IllegalArgumentException").isTrue();
- } catch (IllegalArgumentException e) {
- assertThat(e.getMessage()).contains("Resource not found:
non-existent");
- }
- }
-
@Test
public void testExtractResourceProvidersFromAgent() throws Exception {
// Create an agent with resource annotations
@@ -444,38 +357,4 @@ public class AgentPlanTest {
assertThat(pythonChatModelProvider.getName()).isEqualTo("pythonChatModel");
assertThat(pythonChatModelProvider.getType()).isEqualTo(ResourceType.CHAT_MODEL);
}
-
- @Test
- public void testGetResourceFromResourceProvider() throws Exception {
- // Create an agent with resource annotations
- TestAgentWithResources agent = new TestAgentWithResources();
- AgentPlan agentPlan = new AgentPlan(agent);
-
- // Test getting a tool resource
- Resource myTool = agentPlan.getResource("myTool", ResourceType.TOOL);
- assertThat(myTool).isNotNull();
- assertThat(myTool).isInstanceOf(TestTool.class);
- assertThat(myTool.getResourceType()).isEqualTo(ResourceType.TOOL);
-
- // Test getting a chat model resource
- Resource chatModel = agentPlan.getResource("chatModel",
ResourceType.CHAT_MODEL);
- assertThat(chatModel).isNotNull();
- assertThat(chatModel).isInstanceOf(TestSerializableChatModel.class);
-
assertThat(chatModel.getResourceType()).isEqualTo(ResourceType.CHAT_MODEL);
-
- assertThatThrownBy(() -> agentPlan.getResource("pythonChatModel",
ResourceType.CHAT_MODEL))
- .isInstanceOf(IllegalStateException.class)
- .hasMessageContaining("PythonResourceAdapter is not set");
-
- agentPlan.setPythonResourceAdapter(new TestPythonResourceAdapter());
- Resource pythonChatModel =
- agentPlan.getResource("pythonChatModel",
ResourceType.CHAT_MODEL);
- assertThat(pythonChatModel).isNotNull();
- assertThat(pythonChatModel).isInstanceOf(PythonChatModelSetup.class);
-
assertThat(pythonChatModel.getResourceType()).isEqualTo(ResourceType.CHAT_MODEL);
-
- // Test that resources are cached (should be the same instance)
- Resource myToolAgain = agentPlan.getResource("myTool",
ResourceType.TOOL);
- assertThat(myTool).isSameAs(myToolAgain);
- }
}
diff --git
a/plan/src/test/java/org/apache/flink/agents/plan/FunctionToolPlanTest.java
b/plan/src/test/java/org/apache/flink/agents/plan/FunctionToolPlanTest.java
index 4e99464a..a7cef61d 100644
--- a/plan/src/test/java/org/apache/flink/agents/plan/FunctionToolPlanTest.java
+++ b/plan/src/test/java/org/apache/flink/agents/plan/FunctionToolPlanTest.java
@@ -90,7 +90,16 @@ class FunctionToolPlanTest {
void functionToolAgentPlan() throws Exception {
AgentPlan plan = new AgentPlan(new TestAgent());
- FunctionTool javaTool = (FunctionTool) plan.getResource("javaTool",
ResourceType.TOOL);
+ FunctionTool javaTool =
+ (FunctionTool)
+ plan.getResourceProviders()
+ .get(ResourceType.TOOL)
+ .get("javaTool")
+ .provide(
+ (n, t) -> {
+ throw new
UnsupportedOperationException(
+ "No dependencies
expected");
+ });
ToolResponse ok =
javaTool.call(
new ToolParameters(
@@ -102,7 +111,16 @@ class FunctionToolPlanTest {
assertTrue(ok.isSuccess());
assertEquals(36.0, (Double) ok.getResult(), 1e-9);
- FunctionTool pyTool = (FunctionTool) plan.getResource("pyTool",
ResourceType.TOOL);
+ FunctionTool pyTool =
+ (FunctionTool)
+ plan.getResourceProviders()
+ .get(ResourceType.TOOL)
+ .get("pyTool")
+ .provide(
+ (n, t) -> {
+ throw new
UnsupportedOperationException(
+ "No dependencies
expected");
+ });
ToolResponse err = pyTool.call(new ToolParameters(new
HashMap<>(Map.of("x", 1))));
assertFalse(err.isSuccess());
}
diff --git a/python/flink_agents/plan/agent_plan.py
b/python/flink_agents/plan/agent_plan.py
index daf66128..2f31ff3c 100644
--- a/python/flink_agents/plan/agent_plan.py
+++ b/python/flink_agents/plan/agent_plan.py
@@ -20,7 +20,7 @@ from typing import TYPE_CHECKING, Any, Dict, List, cast
from pydantic import BaseModel, field_serializer, model_validator
from flink_agents.api.agents.agent import Agent
-from flink_agents.api.resource import Resource, ResourceDescriptor,
ResourceType
+from flink_agents.api.resource import ResourceDescriptor, ResourceType
from flink_agents.plan.actions.action import Action
from flink_agents.plan.actions.chat_model_action import CHAT_MODEL_ACTION
from flink_agents.plan.actions.context_retrieval_action import
CONTEXT_RETRIEVAL_ACTION
@@ -59,8 +59,6 @@ class AgentPlan(BaseModel):
actions_by_event: Dict[str, List[str]]
resource_providers: Dict[ResourceType, Dict[str, ResourceProvider]] | None
= None
config: AgentConfiguration | None = None
- __resources: Dict[ResourceType, Dict[str, Resource]] = {}
- __j_resource_adapter: Any = None
@field_serializer("resource_providers")
def __serialize_resource_providers(
@@ -199,40 +197,6 @@ class AgentPlan(BaseModel):
"""
return self.actions[action_name].config.get(key, None)
- def get_resource(self, name: str, type: ResourceType) -> Resource:
- """Get resource from agent plan.
-
- Parameters
- ----------
- name : str
- The name of the resource.
- type : ResourceType
- The type of the resource.
- """
- if type not in self.__resources:
- self.__resources[type] = {}
- if name not in self.__resources[type]:
- resource_provider = self.resource_providers[type][name]
- if isinstance(resource_provider, JavaResourceProvider):
-
resource_provider.set_java_resource_adapter(self.__j_resource_adapter)
- resource = resource_provider.provide(
- get_resource=self.get_resource, config=self.config
- )
- self.__resources[type][name] = resource
- return self.__resources[type][name]
-
- def set_java_resource_adapter(self, j_resource_adapter: Any) -> None:
- """Set java resource adapter for java resource provider."""
- self.__j_resource_adapter = j_resource_adapter
-
- def close(self) -> None:
- """Clean up the resources."""
- for type in self.__resources:
- for name in self.__resources[type]:
- self.__resources[type][name].close()
- self.__resources.clear()
-
-
def _get_actions(agent: Agent) -> List[Action]:
"""Extract all registered agent actions from an agent.
@@ -353,7 +317,7 @@ def _get_resource_providers(agent: Agent, config:
AgentConfiguration) -> List[Re
)
for name, descriptor in agent.resources[ResourceType.MCP_SERVER].items():
- _add_mcp_server(name, resource_providers, descriptor)
+ _add_mcp_server(name, resource_providers, descriptor, config)
for resource_type in [
ResourceType.CHAT_MODEL,
@@ -382,8 +346,8 @@ def _add_mcp_server(
resource_providers.append(provider)
- def get_resource(name: str, descriptor: ResourceDescriptor) -> Any:
- """Placeholder."""
+ def get_resource(name: str, type: ResourceType) -> Any:
+ """Placeholder - MCP server construction doesn't need resource
resolution."""
mcp_server = cast("MCPServer", provider.provide(get_resource=get_resource,
config=config))
diff --git a/python/flink_agents/plan/tests/test_agent_plan.py
b/python/flink_agents/plan/tests/test_agent_plan.py
index 6050c485..2b022095 100644
--- a/python/flink_agents/plan/tests/test_agent_plan.py
+++ b/python/flink_agents/plan/tests/test_agent_plan.py
@@ -45,6 +45,7 @@ from flink_agents.api.vector_stores.vector_store import (
from flink_agents.plan.agent_plan import AgentPlan
from flink_agents.plan.configuration import AgentConfiguration
from flink_agents.plan.function import PythonFunction
+from flink_agents.runtime.resource_cache import ResourceCache
class AgentForTest(Agent): # noqa D101
@@ -255,7 +256,8 @@ def test_agent_plan_deserialize(agent_plan: AgentPlan) ->
None: # noqa: D103
def test_get_resource() -> None: # noqa: D103
agent_plan = AgentPlan.from_agent(MyAgent(), AgentConfiguration())
- mock = agent_plan.get_resource("mock", ResourceType.CHAT_MODEL)
+ cache = ResourceCache(agent_plan.resource_providers, agent_plan.config)
+ mock = cache.get_resource("mock", ResourceType.CHAT_MODEL)
assert (
mock.chat(ChatMessage(role=MessageRole.USER, content="")).content
== "8.8.8.8 mock resource just for testing."
diff --git a/python/flink_agents/runtime/flink_runner_context.py
b/python/flink_agents/runtime/flink_runner_context.py
index 7591c062..2c593c0f 100644
--- a/python/flink_agents/runtime/flink_runner_context.py
+++ b/python/flink_agents/runtime/flink_runner_context.py
@@ -45,6 +45,7 @@ from
flink_agents.runtime.memory.vector_store_long_term_memory import (
VectorStoreLongTermMemory,
)
from flink_agents.runtime.python_java_utils import _build_event_log_string
+from flink_agents.runtime.resource_cache import ResourceCache
logger = logging.getLogger(__name__)
@@ -204,7 +205,10 @@ class FlinkRunnerContext(RunnerContext):
"""
self._j_runner_context = j_runner_context
self.__agent_plan = AgentPlan.model_validate_json(agent_plan_json)
- self.__agent_plan.set_java_resource_adapter(j_resource_adapter)
+ self.__resource_cache = ResourceCache(
+ self.__agent_plan.resource_providers, self.__agent_plan.config
+ )
+ self.__resource_cache.set_java_resource_adapter(j_resource_adapter)
self.executor = executor
def set_long_term_memory(self, ltm: InternalBaseLongTermMemory) -> None:
@@ -237,7 +241,7 @@ class FlinkRunnerContext(RunnerContext):
@override
def get_resource(self, name: str, type: ResourceType, metric_group:
MetricGroup = None) -> Resource:
- resource = self.__agent_plan.get_resource(name, type)
+ resource = self.__resource_cache.get_resource(name, type)
# Bind metric group to the resource
resource.set_metric_group(metric_group or self.action_metric_group)
return resource
@@ -503,11 +507,11 @@ class FlinkRunnerContext(RunnerContext):
if self.long_term_memory is not None:
self.long_term_memory.close()
- if self.__agent_plan is not None:
+ if self.__resource_cache is not None:
try:
- self.__agent_plan.close()
+ self.__resource_cache.close()
finally:
- self.__agent_plan = None
+ self.__resource_cache = None
def create_flink_runner_context(
diff --git a/python/flink_agents/runtime/local_runner.py
b/python/flink_agents/runtime/local_runner.py
index e4124439..75a73efb 100644
--- a/python/flink_agents/runtime/local_runner.py
+++ b/python/flink_agents/runtime/local_runner.py
@@ -35,6 +35,7 @@ from flink_agents.plan.agent_plan import AgentPlan
from flink_agents.plan.configuration import AgentConfiguration
from flink_agents.runtime.agent_runner import AgentRunner
from flink_agents.runtime.local_memory_object import LocalMemoryObject
+from flink_agents.runtime.resource_cache import ResourceCache
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
@@ -81,6 +82,9 @@ class LocalRunnerContext(RunnerContext):
KeyedStream.
"""
self.__agent_plan = agent_plan
+ self.__resource_cache = ResourceCache(
+ agent_plan.resource_providers, agent_plan.config
+ )
self.__key = key
self.events = deque()
self._sensory_mem_store = {}
@@ -120,7 +124,7 @@ class LocalRunnerContext(RunnerContext):
@override
def get_resource(self, name: str, type: ResourceType, metric_group:
MetricGroup = None) -> Resource:
- return self.__agent_plan.get_resource(name, type)
+ return self.__resource_cache.get_resource(name, type)
@property
@override
@@ -244,11 +248,11 @@ class LocalRunnerContext(RunnerContext):
def close(self) -> None:
"""Cleanup the resource."""
- if self.__agent_plan is not None:
+ if self.__resource_cache is not None:
try:
- self.__agent_plan.close()
+ self.__resource_cache.close()
finally:
- self.__agent_plan = None
+ self.__resource_cache = None
class LocalRunner(AgentRunner):
diff --git a/python/flink_agents/runtime/resource_cache.py
b/python/flink_agents/runtime/resource_cache.py
new file mode 100644
index 00000000..fa0ca617
--- /dev/null
+++ b/python/flink_agents/runtime/resource_cache.py
@@ -0,0 +1,90 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#################################################################################
+from typing import Any, Dict
+
+from flink_agents.api.resource import Resource, ResourceType
+from flink_agents.plan.configuration import AgentConfiguration
+from flink_agents.plan.resource_provider import JavaResourceProvider,
ResourceProvider
+
+
+class ResourceCache:
+ """Lazily resolves and caches Resource instances from ResourceProviders.
+
+ Resources are created on first access via their provider's ``provide()``
method
+ and cached for subsequent lookups. Supports recursive dependency
resolution — a
+ resource can depend on other resources.
+
+ This class is designed for single-threaded access within Flink's mailbox
+ execution model.
+ """
+
+ def __init__(
+ self,
+ resource_providers: Dict[ResourceType, Dict[str, ResourceProvider]],
+ config: AgentConfiguration | None = None,
+ ) -> None:
+ """Create a ResourceCache from the given resource providers and config.
+
+ Parameters
+ ----------
+ resource_providers : Dict[ResourceType, Dict[str, ResourceProvider]]
+ Two-level mapping of resource type to resource name to provider.
+ config : AgentConfiguration | None
+ Agent configuration passed to providers during resource creation.
+ """
+ self._resource_providers = resource_providers or {}
+ self._config = config
+ self._cache: Dict[ResourceType, Dict[str, Resource]] = {}
+ self._j_resource_adapter: Any = None
+
+ def set_java_resource_adapter(self, j_resource_adapter: Any) -> None:
+ """Set Java resource adapter for Java resource providers."""
+ self._j_resource_adapter = j_resource_adapter
+
+ def get_resource(self, name: str, type: ResourceType) -> Resource:
+ """Get resource by name and type, creating it from its provider if not
cached.
+
+ Parameters
+ ----------
+ name : str
+ The name of the resource.
+ type : ResourceType
+ The type of the resource.
+ """
+ cached = self._cache.get(type, {}).get(name)
+ if cached is not None:
+ return cached
+ providers = self._resource_providers.get(type)
+ if providers is None or name not in providers:
+ msg = f"Resource not found: '{name}' of type {type}"
+ raise KeyError(msg)
+ resource_provider = providers[name]
+ if isinstance(resource_provider, JavaResourceProvider):
+
resource_provider.set_java_resource_adapter(self._j_resource_adapter)
+ resource = resource_provider.provide(
+ get_resource=self.get_resource, config=self._config
+ )
+ self._cache.setdefault(type, {})[name] = resource
+ return resource
+
+ def close(self) -> None:
+ """Clean up all cached resources."""
+ for typed in self._cache.values():
+ for resource in typed.values():
+ resource.close()
+ self._cache.clear()
diff --git
a/runtime/src/main/java/org/apache/flink/agents/runtime/PythonMCPResourceDiscovery.java
b/runtime/src/main/java/org/apache/flink/agents/runtime/PythonMCPResourceDiscovery.java
new file mode 100644
index 00000000..10e21968
--- /dev/null
+++
b/runtime/src/main/java/org/apache/flink/agents/runtime/PythonMCPResourceDiscovery.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.runtime;
+
+import org.apache.flink.agents.api.resource.ResourceType;
+import org.apache.flink.agents.api.resource.python.PythonResourceAdapter;
+import org.apache.flink.agents.plan.resource.python.PythonMCPPrompt;
+import org.apache.flink.agents.plan.resource.python.PythonMCPServer;
+import org.apache.flink.agents.plan.resource.python.PythonMCPTool;
+import org.apache.flink.agents.plan.resourceprovider.PythonResourceProvider;
+import org.apache.flink.agents.plan.resourceprovider.ResourceProvider;
+
+import java.util.Map;
+
+import static org.apache.flink.agents.api.resource.ResourceType.MCP_SERVER;
+import static org.apache.flink.agents.api.resource.ResourceType.PROMPT;
+import static org.apache.flink.agents.api.resource.ResourceType.TOOL;
+
+/**
+ * Discovers tools and prompts from Python MCP servers and registers them in a
ResourceCache.
+ *
+ * <p>Called once during operator initialization after the Python interpreter
is available.
+ */
+public class PythonMCPResourceDiscovery {
+
+ /**
+ * Initializes Python MCP servers from the resource providers, extracts
their tools and prompts,
+ * and registers them in the cache.
+ *
+ * @param resourceProviders the resource providers from the agent plan
+ * @param adapter the Python resource adapter
+ * @param cache the resource cache to register discovered resources in
+ * @throws Exception if a Python MCP server fails to initialize
+ */
+ public static void discoverPythonMCPResources(
+ Map<ResourceType, Map<String, ResourceProvider>> resourceProviders,
+ PythonResourceAdapter adapter,
+ ResourceCache cache)
+ throws Exception {
+
+ // Store the adapter on the cache so that future cache.getResource()
calls on
+ // non-MCP Python resources (e.g. PythonChatModelSetup) will have the
adapter available.
+ cache.setPythonResourceAdapter(adapter);
+
+ Map<String, ResourceProvider> servers =
resourceProviders.get(MCP_SERVER);
+ if (servers == null) {
+ return;
+ }
+
+ for (ResourceProvider rp : servers.values()) {
+ if (!(rp instanceof PythonResourceProvider)) {
+ continue;
+ }
+ PythonResourceProvider provider = (PythonResourceProvider) rp;
+ provider.setPythonResourceAdapter(adapter);
+
+ PythonMCPServer server =
+ (PythonMCPServer)
+ provider.provide(
+ (name, type) -> {
+ try {
+ return cache.getResource(name,
type);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ for (PythonMCPTool tool : server.listTools()) {
+ cache.put(tool.getName(), TOOL, tool);
+ }
+ for (PythonMCPPrompt prompt : server.listPrompts()) {
+ cache.put(prompt.getName(), PROMPT, prompt);
+ }
+ }
+ }
+}
diff --git
a/runtime/src/main/java/org/apache/flink/agents/runtime/ResourceCache.java
b/runtime/src/main/java/org/apache/flink/agents/runtime/ResourceCache.java
new file mode 100644
index 00000000..3537a5a7
--- /dev/null
+++ b/runtime/src/main/java/org/apache/flink/agents/runtime/ResourceCache.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.runtime;
+
+import org.apache.flink.agents.api.resource.Resource;
+import org.apache.flink.agents.api.resource.ResourceType;
+import org.apache.flink.agents.api.resource.python.PythonResourceAdapter;
+import org.apache.flink.agents.plan.resourceprovider.PythonResourceProvider;
+import org.apache.flink.agents.plan.resourceprovider.ResourceProvider;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Lazily resolves and caches Resource instances from ResourceProviders.
+ *
+ * <p>Resources are created on first access via their provider's {@code
provide()} method and cached
+ * for subsequent lookups. Supports recursive dependency resolution — a
resource can depend on other
+ * resources.
+ *
+ * <p>Thread-safe: resource resolution can happen on async pool threads (e.g.
when {@code
+ * BaseChatModelSetup.chat()} resolves connection, prompt, and tools inside a
{@code
+ * durableExecuteAsync} callable).
+ */
+public class ResourceCache implements AutoCloseable {
+
+ private final Map<ResourceType, Map<String, ResourceProvider>>
resourceProviders;
+ private final Map<ResourceType, Map<String, Resource>> cache = new
ConcurrentHashMap<>();
+ private volatile PythonResourceAdapter pythonResourceAdapter;
+
+ public ResourceCache(Map<ResourceType, Map<String, ResourceProvider>>
resourceProviders) {
+ // Defensive copy: the cache must not be affected by later mutations
to the source map.
+ this.resourceProviders = new HashMap<>();
+ for (Map.Entry<ResourceType, Map<String, ResourceProvider>> entry :
+ resourceProviders.entrySet()) {
+ this.resourceProviders.put(entry.getKey(), new
HashMap<>(entry.getValue()));
+ }
+ }
+
+ void setPythonResourceAdapter(PythonResourceAdapter adapter) {
+ this.pythonResourceAdapter = adapter;
+ }
+
+ /**
+ * Resolves a resource by name and type, creating it from its provider if
not cached.
+ *
+ * @param name the resource name
+ * @param type the resource type
+ * @return the resource instance
+ * @throws Exception if the resource cannot be found or created
+ */
+ public synchronized Resource getResource(String name, ResourceType type)
throws Exception {
+ Map<String, Resource> typed = cache.get(type);
+ if (typed != null) {
+ Resource cached = typed.get(name);
+ if (cached != null) {
+ return cached;
+ }
+ }
+
+ Map<String, ResourceProvider> providers = resourceProviders.get(type);
+ if (providers == null || !providers.containsKey(name)) {
+ throw new IllegalArgumentException("Resource not found: " + name +
" of type " + type);
+ }
+ ResourceProvider provider = providers.get(name);
+
+ if (pythonResourceAdapter != null && provider instanceof
PythonResourceProvider) {
+ ((PythonResourceProvider)
provider).setPythonResourceAdapter(pythonResourceAdapter);
+ }
+
+ Resource resource =
+ provider.provide(
+ (anotherName, anotherType) -> {
+ try {
+ return this.getResource(anotherName,
anotherType);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ cache.computeIfAbsent(type, k -> new ConcurrentHashMap<>()).put(name,
resource);
+ return resource;
+ }
+
+ /**
+ * Puts a resource directly into the cache.
+ *
+ * @param name the resource name
+ * @param type the resource type
+ * @param resource the resource instance
+ */
+ public void put(String name, ResourceType type, Resource resource) {
+ cache.computeIfAbsent(type, k -> new ConcurrentHashMap<>()).put(name,
resource);
+ }
+
+ @Override
+ public void close() throws Exception {
+ Exception firstException = null;
+ for (Map<String, Resource> resources : cache.values()) {
+ for (Resource resource : resources.values()) {
+ try {
+ resource.close();
+ } catch (Exception e) {
+ if (firstException == null) {
+ firstException = e;
+ } else {
+ firstException.addSuppressed(e);
+ }
+ }
+ }
+ }
+ cache.clear();
+ if (firstException != null) {
+ throw firstException;
+ }
+ }
+}
diff --git
a/runtime/src/main/java/org/apache/flink/agents/runtime/context/JavaRunnerContextImpl.java
b/runtime/src/main/java/org/apache/flink/agents/runtime/context/JavaRunnerContextImpl.java
index a18bb30b..ce5f3f11 100644
---
a/runtime/src/main/java/org/apache/flink/agents/runtime/context/JavaRunnerContextImpl.java
+++
b/runtime/src/main/java/org/apache/flink/agents/runtime/context/JavaRunnerContextImpl.java
@@ -19,6 +19,7 @@ package org.apache.flink.agents.runtime.context;
import org.apache.flink.agents.api.context.DurableCallable;
import org.apache.flink.agents.plan.AgentPlan;
+import org.apache.flink.agents.runtime.ResourceCache;
import org.apache.flink.agents.runtime.async.ContinuationActionExecutor;
import org.apache.flink.agents.runtime.async.ContinuationContext;
import org.apache.flink.agents.runtime.metrics.FlinkAgentsMetricGroupImpl;
@@ -38,9 +39,10 @@ public class JavaRunnerContextImpl extends RunnerContextImpl
{
FlinkAgentsMetricGroupImpl agentMetricGroup,
Runnable mailboxThreadChecker,
AgentPlan agentPlan,
+ ResourceCache resourceCache,
String jobIdentifier,
ContinuationActionExecutor continuationExecutor) {
- super(agentMetricGroup, mailboxThreadChecker, agentPlan,
jobIdentifier);
+ super(agentMetricGroup, mailboxThreadChecker, agentPlan,
resourceCache, jobIdentifier);
this.continuationExecutor = continuationExecutor;
}
diff --git
a/runtime/src/main/java/org/apache/flink/agents/runtime/context/RunnerContextImpl.java
b/runtime/src/main/java/org/apache/flink/agents/runtime/context/RunnerContextImpl.java
index cab790dc..8c7b9fc3 100644
---
a/runtime/src/main/java/org/apache/flink/agents/runtime/context/RunnerContextImpl.java
+++
b/runtime/src/main/java/org/apache/flink/agents/runtime/context/RunnerContextImpl.java
@@ -33,6 +33,7 @@ import org.apache.flink.agents.api.resource.ResourceType;
import org.apache.flink.agents.plan.AgentPlan;
import org.apache.flink.agents.plan.actions.Action;
import org.apache.flink.agents.plan.utils.JsonUtils;
+import org.apache.flink.agents.runtime.ResourceCache;
import org.apache.flink.agents.runtime.actionstate.ActionState;
import org.apache.flink.agents.runtime.actionstate.CallResult;
import org.apache.flink.agents.runtime.memory.CachedMemoryStore;
@@ -98,6 +99,7 @@ public class RunnerContextImpl implements RunnerContext {
protected final FlinkAgentsMetricGroupImpl agentMetricGroup;
protected final Runnable mailboxThreadChecker;
protected final AgentPlan agentPlan;
+ protected final ResourceCache resourceCache;
protected MemoryContext memoryContext;
protected String actionName;
@@ -110,10 +112,12 @@ public class RunnerContextImpl implements RunnerContext {
FlinkAgentsMetricGroupImpl agentMetricGroup,
Runnable mailboxThreadChecker,
AgentPlan agentPlan,
+ ResourceCache resourceCache,
String jobIdentifier) {
this.agentMetricGroup = agentMetricGroup;
this.mailboxThreadChecker = mailboxThreadChecker;
this.agentPlan = agentPlan;
+ this.resourceCache = resourceCache;
LongTermMemoryOptions.LongTermMemoryBackend backend =
this.getConfig().get(LongTermMemoryOptions.BACKEND);
@@ -221,10 +225,10 @@ public class RunnerContextImpl implements RunnerContext {
@Override
public Resource getResource(String name, ResourceType type) throws
Exception {
- if (agentPlan == null) {
- throw new IllegalStateException("AgentPlan is not available in
this context");
+ if (resourceCache == null) {
+ throw new IllegalStateException("ResourceCache is not available in
this context");
}
- Resource resource = agentPlan.getResource(name, type);
+ Resource resource = resourceCache.getResource(name, type);
// Set current action's metric group to the resource
resource.setMetricGroup(getActionMetricGroup());
return resource;
@@ -394,8 +398,6 @@ public class RunnerContextImpl implements RunnerContext {
this.ltm.close();
this.ltm = null;
}
-
- this.agentPlan.close();
}
public String getActionName() {
diff --git
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
index 89e35131..e5015a3a 100644
---
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
+++
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
@@ -35,6 +35,8 @@ import org.apache.flink.agents.plan.JavaFunction;
import org.apache.flink.agents.plan.PythonFunction;
import org.apache.flink.agents.plan.actions.Action;
import org.apache.flink.agents.plan.resourceprovider.PythonResourceProvider;
+import org.apache.flink.agents.runtime.PythonMCPResourceDiscovery;
+import org.apache.flink.agents.runtime.ResourceCache;
import org.apache.flink.agents.runtime.actionstate.ActionState;
import org.apache.flink.agents.runtime.actionstate.ActionStateStore;
import org.apache.flink.agents.runtime.actionstate.KafkaActionStateStore;
@@ -131,6 +133,8 @@ public class ActionExecutionOperator<IN, OUT> extends
AbstractStreamOperator<OUT
private final AgentPlan agentPlan;
+ private transient ResourceCache resourceCache;
+
private final Boolean inputIsJava;
private transient StreamRecord<OUT> reusedStreamRecord;
@@ -266,6 +270,8 @@ public class ActionExecutionOperator<IN, OUT> extends
AbstractStreamOperator<OUT
TypeInformation.of(MemoryObjectImpl.MemoryItem.class));
shortTermMemState =
getRuntimeContext().getMapState(shortTermMemStateDescriptor);
+ resourceCache = new ResourceCache(agentPlan.getResourceProviders());
+
metricGroup = new FlinkAgentsMetricGroupImpl(getMetricGroup());
builtInMetrics = new BuiltInMetrics(metricGroup, agentPlan);
@@ -606,7 +612,7 @@ public class ActionExecutionOperator<IN, OUT> extends
AbstractStreamOperator<OUT
private Resource getResource(String name, ResourceType type) {
try {
- return agentPlan.getResource(name, type);
+ return resourceCache.getResource(name, type);
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -651,6 +657,7 @@ public class ActionExecutionOperator<IN, OUT> extends
AbstractStreamOperator<OUT
this.metricGroup,
this::checkMailboxThread,
this.agentPlan,
+ this.resourceCache,
this.jobIdentifier);
javaResourceAdapter = new JavaResourceAdapter(this::getResource,
pythonInterpreter);
@@ -679,7 +686,7 @@ public class ActionExecutionOperator<IN, OUT> extends
AbstractStreamOperator<OUT
new PythonResourceAdapterImpl(
(String anotherName, ResourceType anotherType) -> {
try {
- return agentPlan.getResource(anotherName,
anotherType);
+ return resourceCache.getResource(anotherName,
anotherType);
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -687,7 +694,8 @@ public class ActionExecutionOperator<IN, OUT> extends
AbstractStreamOperator<OUT
pythonInterpreter,
javaResourceAdapter);
pythonResourceAdapter.open();
- agentPlan.setPythonResourceAdapter(pythonResourceAdapter);
+ PythonMCPResourceDiscovery.discoverPythonMCPResources(
+ agentPlan.getResourceProviders(), pythonResourceAdapter,
resourceCache);
}
@Override
@@ -704,6 +712,10 @@ public class ActionExecutionOperator<IN, OUT> extends
AbstractStreamOperator<OUT
@Override
public void close() throws Exception {
+ // Must close before pythonInterpreter since cached resources may hold
Python references.
+ if (resourceCache != null) {
+ resourceCache.close();
+ }
if (runnerContext != null) {
try {
runnerContext.close();
@@ -726,9 +738,6 @@ public class ActionExecutionOperator<IN, OUT> extends
AbstractStreamOperator<OUT
if (actionStateStore != null) {
actionStateStore.close();
}
- if (runnerContext != null) {
- runnerContext.close();
- }
if (continuationActionExecutor != null) {
continuationActionExecutor.close();
}
@@ -1096,6 +1105,7 @@ public class ActionExecutionOperator<IN, OUT> extends
AbstractStreamOperator<OUT
this.metricGroup,
this::checkMailboxThread,
this.agentPlan,
+ this.resourceCache,
this.jobIdentifier,
continuationActionExecutor);
}
@@ -1107,6 +1117,7 @@ public class ActionExecutionOperator<IN, OUT> extends
AbstractStreamOperator<OUT
this.metricGroup,
this::checkMailboxThread,
this.agentPlan,
+ this.resourceCache,
jobIdentifier);
}
return pythonRunnerContext;
diff --git
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/context/PythonRunnerContextImpl.java
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/context/PythonRunnerContextImpl.java
index 7bfea4ad..e9741baf 100644
---
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/context/PythonRunnerContextImpl.java
+++
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/context/PythonRunnerContextImpl.java
@@ -21,6 +21,7 @@ package org.apache.flink.agents.runtime.python.context;
import org.apache.flink.agents.api.Event;
import org.apache.flink.agents.api.context.RunnerContext;
import org.apache.flink.agents.plan.AgentPlan;
+import org.apache.flink.agents.runtime.ResourceCache;
import org.apache.flink.agents.runtime.context.RunnerContextImpl;
import org.apache.flink.agents.runtime.metrics.FlinkAgentsMetricGroupImpl;
import org.apache.flink.agents.runtime.python.event.PythonEvent;
@@ -42,8 +43,9 @@ public class PythonRunnerContextImpl extends
RunnerContextImpl {
FlinkAgentsMetricGroupImpl agentMetricGroup,
Runnable mailboxThreadChecker,
AgentPlan agentPlan,
+ ResourceCache resourceCache,
String jobIdentifier) {
- super(agentMetricGroup, mailboxThreadChecker, agentPlan,
jobIdentifier);
+ super(agentMetricGroup, mailboxThreadChecker, agentPlan,
resourceCache, jobIdentifier);
}
@Override
diff --git
a/runtime/src/test/java/org/apache/flink/agents/runtime/ResourceCacheTest.java
b/runtime/src/test/java/org/apache/flink/agents/runtime/ResourceCacheTest.java
new file mode 100644
index 00000000..162c001f
--- /dev/null
+++
b/runtime/src/test/java/org/apache/flink/agents/runtime/ResourceCacheTest.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.agents.runtime;
+
+import org.apache.flink.agents.api.InputEvent;
+import org.apache.flink.agents.api.agents.Agent;
+import org.apache.flink.agents.api.annotation.ChatModelSetup;
+import org.apache.flink.agents.api.annotation.Tool;
+import org.apache.flink.agents.api.chat.messages.ChatMessage;
+import org.apache.flink.agents.api.chat.model.python.PythonChatModelSetup;
+import org.apache.flink.agents.api.context.RunnerContext;
+import org.apache.flink.agents.api.resource.Resource;
+import org.apache.flink.agents.api.resource.ResourceDescriptor;
+import org.apache.flink.agents.api.resource.ResourceType;
+import org.apache.flink.agents.api.resource.SerializableResource;
+import org.apache.flink.agents.api.resource.python.PythonResourceAdapter;
+import org.apache.flink.agents.api.resource.python.PythonResourceWrapper;
+import
org.apache.flink.agents.api.vectorstores.CollectionManageableVectorStore;
+import org.apache.flink.agents.api.vectorstores.Document;
+import org.apache.flink.agents.api.vectorstores.VectorStoreQuery;
+import org.apache.flink.agents.api.vectorstores.VectorStoreQueryResult;
+import org.apache.flink.agents.plan.AgentPlan;
+import org.junit.jupiter.api.Test;
+import pemja.core.object.PyObject;
+
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiFunction;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link ResourceCache}. */
+public class ResourceCacheTest {
+
+ /** Test tool resource class. */
+ public static class TestTool extends SerializableResource {
+ private final String name;
+
+ public TestTool(String name) {
+ this.name = name;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ @Override
+ public ResourceType getResourceType() {
+ return ResourceType.TOOL;
+ }
+ }
+
+ /** Test serializable chat model resource class. */
+ public static class TestSerializableChatModel extends SerializableResource
{
+ private final String name;
+
+ public TestSerializableChatModel(String name) {
+ this.name = name;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ @Override
+ public ResourceType getResourceType() {
+ return ResourceType.CHAT_MODEL;
+ }
+ }
+
+ public static class TestPythonResource extends Resource implements
PythonResourceWrapper {
+
+ public TestPythonResource(
+ PythonResourceAdapter adapter,
+ PyObject chatModel,
+ ResourceDescriptor descriptor,
+ BiFunction<String, ResourceType, Resource> getResource) {
+ super(descriptor, getResource);
+ }
+
+ @Override
+ public ResourceType getResourceType() {
+ return ResourceType.CHAT_MODEL;
+ }
+
+ @Override
+ public Object getPythonResource() {
+ return null;
+ }
+ }
+
+ /** Test agent class with resource annotations. */
+ public static class TestAgentWithResources extends Agent {
+
+ @Tool private TestTool myTool = new TestTool("myTool");
+
+ @ChatModelSetup
+ private TestSerializableChatModel chatModel =
+ new TestSerializableChatModel("defaultChatModel");
+
+ @ChatModelSetup
+ public static ResourceDescriptor pythonChatModel() {
+ return
ResourceDescriptor.Builder.newBuilder(TestPythonResource.class.getName())
+ .addInitialArgument("pythonClazz", "test.module.TestClazz")
+ .build();
+ }
+
+ @Tool private TestTool anotherTool = new TestTool("anotherTool");
+
+ @org.apache.flink.agents.api.annotation.Action(listenEvents =
{InputEvent.class})
+ public void handleInputEvent(InputEvent event, RunnerContext context)
{}
+ }
+
+ public static class TestPythonResourceAdapter implements
PythonResourceAdapter {
+
+ @Override
+ public Object getResource(String resourceName, String resourceType) {
+ return null;
+ }
+
+ @Override
+ public PyObject initPythonResource(
+ String module, String clazz, Map<String, Object> kwargs) {
+ return null;
+ }
+
+ @Override
+ public Object toPythonChatMessage(ChatMessage message) {
+ return null;
+ }
+
+ @Override
+ public ChatMessage fromPythonChatMessage(Object pythonChatMessage) {
+ return null;
+ }
+
+ @Override
+ public Object toPythonDocuments(List<Document> documents) {
+ return null;
+ }
+
+ @Override
+ public List<Document> fromPythonDocuments(List<PyObject>
pythonDocuments) {
+ return List.of();
+ }
+
+ @Override
+ public Object toPythonVectorStoreQuery(VectorStoreQuery query) {
+ return null;
+ }
+
+ @Override
+ public VectorStoreQueryResult fromPythonVectorStoreQueryResult(
+ PyObject pythonVectorStoreQueryResult) {
+ return null;
+ }
+
+ @Override
+ public CollectionManageableVectorStore.Collection fromPythonCollection(
+ PyObject pythonCollection) {
+ return null;
+ }
+
+ @Override
+ public Object
convertToPythonTool(org.apache.flink.agents.api.tools.Tool tool) {
+ return null;
+ }
+
+ @Override
+ public Object callMethod(Object obj, String methodName, Map<String,
Object> kwargs) {
+ return null;
+ }
+
+ @Override
+ public Object invoke(String name, Object... args) {
+ return null;
+ }
+ }
+
+ @Test
+ public void testGetResourceNotFound() throws Exception {
+ Agent agent = new Agent();
+ AgentPlan agentPlan = new AgentPlan(agent);
+ ResourceCache cache = new
ResourceCache(agentPlan.getResourceProviders());
+
+ assertThatThrownBy(() -> cache.getResource("non-existent",
ResourceType.CHAT_MODEL))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessageContaining("Resource not found: non-existent");
+ }
+
+ @Test
+ public void testGetResourceFromResourceProvider() throws Exception {
+ TestAgentWithResources agent = new TestAgentWithResources();
+ AgentPlan agentPlan = new AgentPlan(agent);
+ ResourceCache cache = new
ResourceCache(agentPlan.getResourceProviders());
+
+ // Test getting a tool resource
+ Resource myTool = cache.getResource("myTool", ResourceType.TOOL);
+ assertThat(myTool).isNotNull();
+ assertThat(myTool).isInstanceOf(TestTool.class);
+ assertThat(myTool.getResourceType()).isEqualTo(ResourceType.TOOL);
+
+ // Test getting a chat model resource
+ Resource chatModel = cache.getResource("chatModel",
ResourceType.CHAT_MODEL);
+ assertThat(chatModel).isNotNull();
+ assertThat(chatModel).isInstanceOf(TestSerializableChatModel.class);
+
assertThat(chatModel.getResourceType()).isEqualTo(ResourceType.CHAT_MODEL);
+
+ assertThatThrownBy(() -> cache.getResource("pythonChatModel",
ResourceType.CHAT_MODEL))
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessageContaining("PythonResourceAdapter is not set");
+
+ PythonMCPResourceDiscovery.discoverPythonMCPResources(
+ agentPlan.getResourceProviders(), new
TestPythonResourceAdapter(), cache);
+ Resource pythonChatModel = cache.getResource("pythonChatModel",
ResourceType.CHAT_MODEL);
+ assertThat(pythonChatModel).isNotNull();
+ assertThat(pythonChatModel).isInstanceOf(PythonChatModelSetup.class);
+
assertThat(pythonChatModel.getResourceType()).isEqualTo(ResourceType.CHAT_MODEL);
+
+ // Test that resources are cached (should be the same instance)
+ Resource myToolAgain = cache.getResource("myTool", ResourceType.TOOL);
+ assertThat(myTool).isSameAs(myToolAgain);
+ }
+}