This is an automated email from the ASF dual-hosted git repository.
xtsong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-agents.git
The following commit(s) were added to refs/heads/main by this push:
new 4452abeb [api][runtime] Clean up resource when close. (#428)
4452abeb is described below
commit 4452abebe1f7e29e816fd25e633c62f2d32b8259
Author: Wenjin Xie <[email protected]>
AuthorDate: Wed Jan 14 17:37:17 2026 +0800
[api][runtime] Clean up resource when close. (#428)
---
.../api/chat/model/python/PythonChatModelConnection.java | 5 +++++
.../model/python/PythonEmbeddingModelConnection.java | 5 +++++
.../org/apache/flink/agents/api/resource/Resource.java | 3 +++
.../anthropic/AnthropicChatModelConnection.java | 5 +++++
.../chatmodels/openai/OpenAIChatModelConnection.java | 5 +++++
.../apache/flink/agents/integrations/mcp/MCPPrompt.java | 5 +++++
.../apache/flink/agents/integrations/mcp/MCPTool.java | 5 +++++
.../elasticsearch/ElasticsearchVectorStore.java | 5 +++++
.../java/org/apache/flink/agents/plan/AgentPlan.java | 9 +++++++++
python/flink_agents/api/resource.py | 3 +++
python/flink_agents/api/runner_context.py | 4 ++++
.../chat_models/anthropic/anthropic_chat_model.py | 9 +++++++++
.../integrations/chat_models/openai/openai_chat_model.py | 9 +++++++++
.../embedding_models/openai_embedding_model.py | 12 +++++++++++-
python/flink_agents/integrations/mcp/mcp.py | 16 ++++++++++++++++
python/flink_agents/plan/agent_plan.py | 8 ++++++++
python/flink_agents/runtime/flink_runner_context.py | 16 +++++++++++++++-
python/flink_agents/runtime/java/java_chat_model.py | 4 ++++
.../flink_agents/runtime/java/java_resource_wrapper.py | 4 ++++
python/flink_agents/runtime/local_runner.py | 10 +++++++++-
.../flink/agents/runtime/context/RunnerContextImpl.java | 2 ++
.../agents/runtime/operator/ActionExecutionOperator.java | 7 +++++++
.../runtime/python/utils/PythonActionExecutor.java | 11 +++++++++++
23 files changed, 159 insertions(+), 3 deletions(-)
diff --git
a/api/src/main/java/org/apache/flink/agents/api/chat/model/python/PythonChatModelConnection.java
b/api/src/main/java/org/apache/flink/agents/api/chat/model/python/PythonChatModelConnection.java
index 7cae38bd..42018f6d 100644
---
a/api/src/main/java/org/apache/flink/agents/api/chat/model/python/PythonChatModelConnection.java
+++
b/api/src/main/java/org/apache/flink/agents/api/chat/model/python/PythonChatModelConnection.java
@@ -87,4 +87,9 @@ public class PythonChatModelConnection extends
BaseChatModelConnection
Object pythonMessageResponse = adapter.callMethod(chatModel, "chat",
kwargs);
return adapter.fromPythonChatMessage(pythonMessageResponse);
}
+
+ @Override
+ public void close() throws Exception {
+ this.chatModel.close();
+ }
}
diff --git
a/api/src/main/java/org/apache/flink/agents/api/embedding/model/python/PythonEmbeddingModelConnection.java
b/api/src/main/java/org/apache/flink/agents/api/embedding/model/python/PythonEmbeddingModelConnection.java
index b4ff933a..f91896d1 100644
---
a/api/src/main/java/org/apache/flink/agents/api/embedding/model/python/PythonEmbeddingModelConnection.java
+++
b/api/src/main/java/org/apache/flink/agents/api/embedding/model/python/PythonEmbeddingModelConnection.java
@@ -124,4 +124,9 @@ public class PythonEmbeddingModelConnection extends
BaseEmbeddingModelConnection
public Object getPythonResource() {
return embeddingModel;
}
+
+ @Override
+ public void close() throws Exception {
+ this.embeddingModel.close();
+ }
}
diff --git
a/api/src/main/java/org/apache/flink/agents/api/resource/Resource.java
b/api/src/main/java/org/apache/flink/agents/api/resource/Resource.java
index 78b15ad0..d386ca6d 100644
--- a/api/src/main/java/org/apache/flink/agents/api/resource/Resource.java
+++ b/api/src/main/java/org/apache/flink/agents/api/resource/Resource.java
@@ -64,4 +64,7 @@ public abstract class Resource {
protected FlinkAgentsMetricGroup getMetricGroup() {
return metricGroup;
}
+
+ /** Close the resource. */
+ public void close() throws Exception {}
}
diff --git
a/integrations/chat-models/anthropic/src/main/java/org/apache/flink/agents/integrations/chatmodels/anthropic/AnthropicChatModelConnection.java
b/integrations/chat-models/anthropic/src/main/java/org/apache/flink/agents/integrations/chatmodels/anthropic/AnthropicChatModelConnection.java
index 49fbef3e..6dded957 100644
---
a/integrations/chat-models/anthropic/src/main/java/org/apache/flink/agents/integrations/chatmodels/anthropic/AnthropicChatModelConnection.java
+++
b/integrations/chat-models/anthropic/src/main/java/org/apache/flink/agents/integrations/chatmodels/anthropic/AnthropicChatModelConnection.java
@@ -111,6 +111,11 @@ public class AnthropicChatModelConnection extends
BaseChatModelConnection {
this.client = builder.build();
}
+ @Override
+ public void close() {
+ this.client.close();
+ }
+
@Override
public ChatMessage chat(
List<ChatMessage> messages,
diff --git
a/integrations/chat-models/openai/src/main/java/org/apache/flink/agents/integrations/chatmodels/openai/OpenAIChatModelConnection.java
b/integrations/chat-models/openai/src/main/java/org/apache/flink/agents/integrations/chatmodels/openai/OpenAIChatModelConnection.java
index 2675d424..b04cd2b2 100644
---
a/integrations/chat-models/openai/src/main/java/org/apache/flink/agents/integrations/chatmodels/openai/OpenAIChatModelConnection.java
+++
b/integrations/chat-models/openai/src/main/java/org/apache/flink/agents/integrations/chatmodels/openai/OpenAIChatModelConnection.java
@@ -451,4 +451,9 @@ public class OpenAIChatModelConnection extends
BaseChatModelConnection {
}
return mapper.convertValue(value, MAP_TYPE);
}
+
+ @Override
+ public void close() throws Exception {
+ this.client.close();
+ }
}
diff --git
a/integrations/mcp/src/main/java/org/apache/flink/agents/integrations/mcp/MCPPrompt.java
b/integrations/mcp/src/main/java/org/apache/flink/agents/integrations/mcp/MCPPrompt.java
index 8e5c6828..5fd9085a 100644
---
a/integrations/mcp/src/main/java/org/apache/flink/agents/integrations/mcp/MCPPrompt.java
+++
b/integrations/mcp/src/main/java/org/apache/flink/agents/integrations/mcp/MCPPrompt.java
@@ -227,4 +227,9 @@ public class MCPPrompt extends Prompt {
public String toString() {
return String.format("MCPPrompt{name='%s', server='%s'}", name,
mcpServer.getEndpoint());
}
+
+ @Override
+ public void close() throws Exception {
+ this.mcpServer.close();
+ }
}
diff --git
a/integrations/mcp/src/main/java/org/apache/flink/agents/integrations/mcp/MCPTool.java
b/integrations/mcp/src/main/java/org/apache/flink/agents/integrations/mcp/MCPTool.java
index 0ad179ae..286fb2d4 100644
---
a/integrations/mcp/src/main/java/org/apache/flink/agents/integrations/mcp/MCPTool.java
+++
b/integrations/mcp/src/main/java/org/apache/flink/agents/integrations/mcp/MCPTool.java
@@ -125,4 +125,9 @@ public class MCPTool extends Tool {
return String.format(
"MCPTool{name='%s', server='%s'}", metadata.getName(),
mcpServer.getEndpoint());
}
+
+ @Override
+ public void close() throws Exception {
+ this.mcpServer.close();
+ }
}
diff --git
a/integrations/vector-stores/elasticsearch/src/main/java/org/apache/flink/agents/integrations/vectorstores/elasticsearch/ElasticsearchVectorStore.java
b/integrations/vector-stores/elasticsearch/src/main/java/org/apache/flink/agents/integrations/vectorstores/elasticsearch/ElasticsearchVectorStore.java
index cf09d55f..ec620bc6 100644
---
a/integrations/vector-stores/elasticsearch/src/main/java/org/apache/flink/agents/integrations/vectorstores/elasticsearch/ElasticsearchVectorStore.java
+++
b/integrations/vector-stores/elasticsearch/src/main/java/org/apache/flink/agents/integrations/vectorstores/elasticsearch/ElasticsearchVectorStore.java
@@ -249,6 +249,11 @@ public class ElasticsearchVectorStore extends
BaseVectorStore
this.client = new ElasticsearchClient(transport);
}
+ @Override
+ public void close() throws Exception {
+ this.client.close();
+ }
+
@Override
public Collection getOrCreateCollection(String name, Map<String, Object>
metadata)
throws Exception {
diff --git a/plan/src/main/java/org/apache/flink/agents/plan/AgentPlan.java
b/plan/src/main/java/org/apache/flink/agents/plan/AgentPlan.java
index b27525b8..6526f5a4 100644
--- a/plan/src/main/java/org/apache/flink/agents/plan/AgentPlan.java
+++ b/plan/src/main/java/org/apache/flink/agents/plan/AgentPlan.java
@@ -212,6 +212,15 @@ public class AgentPlan implements Serializable {
return config.getConfData();
}
+ public void close() throws Exception {
+ for (Map<String, Resource> resources : resourceCache.values()) {
+ for (Resource resource : resources.values()) {
+ resource.close();
+ }
+ }
+ resourceCache.clear();
+ }
+
private void writeObject(ObjectOutputStream out) throws IOException {
String serializedStr = new ObjectMapper().writeValueAsString(this);
out.writeUTF(serializedStr);
diff --git a/python/flink_agents/api/resource.py
b/python/flink_agents/api/resource.py
index 090690e0..ff9be303 100644
--- a/python/flink_agents/api/resource.py
+++ b/python/flink_agents/api/resource.py
@@ -90,6 +90,9 @@ class Resource(BaseModel, ABC):
"""
return self._metric_group
+ def close(self) -> None:
+ """Close the resource."""
+
class SerializableResource(Resource, ABC):
"""Resource which is serializable."""
diff --git a/python/flink_agents/api/runner_context.py
b/python/flink_agents/api/runner_context.py
index 43d115a4..5461e7fc 100644
--- a/python/flink_agents/api/runner_context.py
+++ b/python/flink_agents/api/runner_context.py
@@ -231,3 +231,7 @@ class RunnerContext(ABC):
ReadableConfiguration
The configuration for flink agents.
"""
+
+ @abstractmethod
+ def close(self) -> None:
+ """Clean up the resources."""
diff --git
a/python/flink_agents/integrations/chat_models/anthropic/anthropic_chat_model.py
b/python/flink_agents/integrations/chat_models/anthropic/anthropic_chat_model.py
index 297fb5e7..e0d1233a 100644
---
a/python/flink_agents/integrations/chat_models/anthropic/anthropic_chat_model.py
+++
b/python/flink_agents/integrations/chat_models/anthropic/anthropic_chat_model.py
@@ -22,6 +22,7 @@ from anthropic import Anthropic
from anthropic._types import NOT_GIVEN
from anthropic.types import MessageParam, TextBlockParam, ToolParam
from pydantic import Field, PrivateAttr
+from typing_extensions import override
from flink_agents.api.chat_message import ChatMessage, MessageRole
from flink_agents.api.chat_models.chat_model import (
@@ -208,6 +209,14 @@ class
AnthropicChatModelConnection(BaseChatModelConnection):
content=message.content[0].text,
)
+ @override
+ def close(self) -> None:
+ if self._client is not None:
+ try:
+ self._client.close()
+ finally:
+ self._client = None
+
DEFAULT_ANTHROPIC_MODEL = "claude-sonnet-4-20250514"
DEFAULT_MAX_TOKENS = 1024
diff --git
a/python/flink_agents/integrations/chat_models/openai/openai_chat_model.py
b/python/flink_agents/integrations/chat_models/openai/openai_chat_model.py
index a00bbaea..b5e3297a 100644
--- a/python/flink_agents/integrations/chat_models/openai/openai_chat_model.py
+++ b/python/flink_agents/integrations/chat_models/openai/openai_chat_model.py
@@ -20,6 +20,7 @@ from typing import Any, Dict, List, Literal, Sequence
import httpx
from openai import NOT_GIVEN, OpenAI
from pydantic import Field, PrivateAttr
+from typing_extensions import override
from flink_agents.api.chat_message import ChatMessage
from flink_agents.api.chat_models.chat_model import (
@@ -184,6 +185,14 @@ class OpenAIChatModelConnection(BaseChatModelConnection):
return convert_from_openai_message(message)
+ @override
+ def close(self) -> None:
+ if self._client is not None:
+ try:
+ self._client.close()
+ finally:
+ self._client = None
+
DEFAULT_TEMPERATURE = 0.1
diff --git
a/python/flink_agents/integrations/embedding_models/openai_embedding_model.py
b/python/flink_agents/integrations/embedding_models/openai_embedding_model.py
index 73cf65ff..234ad8d1 100644
---
a/python/flink_agents/integrations/embedding_models/openai_embedding_model.py
+++
b/python/flink_agents/integrations/embedding_models/openai_embedding_model.py
@@ -19,6 +19,7 @@ from typing import Any, Dict, Sequence
from openai import NOT_GIVEN, OpenAI
from pydantic import Field
+from typing_extensions import override
from flink_agents.api.embedding_models.embedding_model import (
BaseEmbeddingModelConnection,
@@ -94,7 +95,7 @@ class
OpenAIEmbeddingModelConnection(BaseEmbeddingModelConnection):
**kwargs,
)
- __client: OpenAI = None
+ __client: OpenAI | None = None
@property
def client(self) -> OpenAI:
@@ -130,6 +131,15 @@ class
OpenAIEmbeddingModelConnection(BaseEmbeddingModelConnection):
embeddings = [list(embedding.embedding) for embedding in response.data]
return embeddings[0] if isinstance(text, str) else embeddings
+ @override
+ def close(self) -> None:
+ """Do nothing."""
+ if self.__client is not None:
+ try:
+ self.__client.close()
+ finally:
+ self.__client = None
+
class OpenAIEmbeddingModelSetup(BaseEmbeddingModelSetup):
"""The settings for OpenAI embedding model.
diff --git a/python/flink_agents/integrations/mcp/mcp.py
b/python/flink_agents/integrations/mcp/mcp.py
index 32ac4059..f430cbe0 100644
--- a/python/flink_agents/integrations/mcp/mcp.py
+++ b/python/flink_agents/integrations/mcp/mcp.py
@@ -67,6 +67,14 @@ class MCPTool(Tool):
self.mcp_server.call_tool_async(self.metadata.name, *args,
**kwargs)
)
+ @override
+ def close(self) -> None:
+ if self.mcp_server is not None:
+ try:
+ self.mcp_server.close()
+ finally:
+ self.mcp_server = None
+
class MCPPrompt(Prompt):
"""MCP prompt definition that extends the base Prompt class.
@@ -117,6 +125,13 @@ class MCPPrompt(Prompt):
arguments = self._check_arguments(**arguments)
return self.mcp_server.get_prompt(self.name, arguments)
+ @override
+ def close(self) -> None:
+ if self.mcp_server is not None:
+ try:
+ self.mcp_server.close()
+ finally:
+ self.mcp_server = None
class MCPServer(SerializableResource, ABC):
"""Resource representing an MCP server and exposing its tools/prompts.
@@ -308,6 +323,7 @@ class MCPServer(SerializableResource, ABC):
return chat_messages
+ @override
def close(self) -> None:
"""Close the MCP server connection and clean up resources."""
asyncio.run(self._cleanup_connection())
diff --git a/python/flink_agents/plan/agent_plan.py
b/python/flink_agents/plan/agent_plan.py
index 98de608d..b9180ca1 100644
--- a/python/flink_agents/plan/agent_plan.py
+++ b/python/flink_agents/plan/agent_plan.py
@@ -223,6 +223,14 @@ class AgentPlan(BaseModel):
"""Set java resource adapter for java resource provider."""
self.__j_resource_adapter = j_resource_adapter
+ def close(self) -> None:
+ """Clean up the resources."""
+ for type in self.__resources:
+ for name in self.__resources[type]:
+ self.__resources[type][name].close()
+ self.__resources.clear()
+
+
def _get_actions(agent: Agent) -> List[Action]:
"""Extract all registered agent actions from an agent.
diff --git a/python/flink_agents/runtime/flink_runner_context.py
b/python/flink_agents/runtime/flink_runner_context.py
index 7c89ef1c..d9ac02a4 100644
--- a/python/flink_agents/runtime/flink_runner_context.py
+++ b/python/flink_agents/runtime/flink_runner_context.py
@@ -46,7 +46,7 @@ class FlinkRunnerContext(RunnerContext):
This context allows access to event handling.
"""
- __agent_plan: AgentPlan
+ __agent_plan: AgentPlan | None
__ltm: BaseLongTermMemory = None
def __init__(
@@ -209,6 +209,14 @@ class FlinkRunnerContext(RunnerContext):
"""
return self.__agent_plan.config
+ @override
+ def close(self) -> None:
+ if self.__agent_plan is not None:
+ try:
+ self.__agent_plan.close()
+ finally:
+ self.__agent_plan = None
+
def create_flink_runner_context(
j_runner_context: Any,
@@ -247,6 +255,12 @@ def flink_runner_context_switch_action_context(
)
)
+def close_flink_runner_context(
+ ctx: FlinkRunnerContext,
+) -> None:
+ """Clean up the resources kept by the flink runner context."""
+ ctx.close()
+
def create_async_thread_pool() -> ThreadPoolExecutor:
"""Used to create a thread pool to execute asynchronous
diff --git a/python/flink_agents/runtime/java/java_chat_model.py
b/python/flink_agents/runtime/java/java_chat_model.py
index 1d432246..2263b68e 100644
--- a/python/flink_agents/runtime/java/java_chat_model.py
+++ b/python/flink_agents/runtime/java/java_chat_model.py
@@ -81,6 +81,10 @@ class JavaChatModelConnectionImpl(JavaChatModelConnection):
return from_java_chat_message(j_response_message)
+ @override
+ def close(self) -> None:
+ self.j_resource.close()
+
class JavaChatModelSetupImpl(JavaChatModelSetup):
"""Java-based implementation of ChatModelSetup that bridges Python and
Java chat
diff --git a/python/flink_agents/runtime/java/java_resource_wrapper.py
b/python/flink_agents/runtime/java/java_resource_wrapper.py
index 496aab33..56a101ca 100644
--- a/python/flink_agents/runtime/java/java_resource_wrapper.py
+++ b/python/flink_agents/runtime/java/java_resource_wrapper.py
@@ -62,6 +62,10 @@ class JavaPrompt(Prompt):
extra_args=j_chat_message.getExtraArgs()) for j_chat_message in j_chat_messages]
return chatMessages
+ @override
+ def close(self) -> None:
+ self.j_prompt.close()
+
class JavaGetResourceWrapper:
"""Python wrapper for Java ResourceAdapter."""
diff --git a/python/flink_agents/runtime/local_runner.py
b/python/flink_agents/runtime/local_runner.py
index 084f7bd0..5ed6157d 100644
--- a/python/flink_agents/runtime/local_runner.py
+++ b/python/flink_agents/runtime/local_runner.py
@@ -57,7 +57,7 @@ class LocalRunnerContext(RunnerContext):
Name of the action being executed.
"""
- __agent_plan: AgentPlan
+ __agent_plan: AgentPlan | None
__key: Any
events: deque[Event]
action_name: str
@@ -221,6 +221,14 @@ class LocalRunnerContext(RunnerContext):
"""Clean up sensory memory."""
self._sensory_mem_store.clear()
+ def close(self) -> None:
+ """Cleanup the resource."""
+ if self.__agent_plan is not None:
+ try:
+ self.__agent_plan.close()
+ finally:
+ self.__agent_plan = None
+
class LocalRunner(AgentRunner):
"""Agent runner implementation for local execution, which is
diff --git
a/runtime/src/main/java/org/apache/flink/agents/runtime/context/RunnerContextImpl.java
b/runtime/src/main/java/org/apache/flink/agents/runtime/context/RunnerContextImpl.java
index 549359f3..2b8d8dc5 100644
---
a/runtime/src/main/java/org/apache/flink/agents/runtime/context/RunnerContextImpl.java
+++
b/runtime/src/main/java/org/apache/flink/agents/runtime/context/RunnerContextImpl.java
@@ -231,6 +231,8 @@ public class RunnerContextImpl implements RunnerContext {
this.ltm.close();
this.ltm = null;
}
+
+ this.agentPlan.close();
}
public String getActionName() {
diff --git
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
index 70a49c0e..1b569ac8 100644
---
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
+++
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
@@ -643,6 +643,13 @@ public class ActionExecutionOperator<IN, OUT> extends
AbstractStreamOperator<OUT
@Override
public void close() throws Exception {
+ if (runnerContext != null) {
+ try {
+ runnerContext.close();
+ } finally {
+ runnerContext = null;
+ }
+ }
if (pythonActionExecutor != null) {
pythonActionExecutor.close();
}
diff --git
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonActionExecutor.java
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonActionExecutor.java
index 4d7792aa..3f9ad702 100644
---
a/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonActionExecutor.java
+++
b/runtime/src/main/java/org/apache/flink/agents/runtime/python/utils/PythonActionExecutor.java
@@ -43,6 +43,9 @@ public class PythonActionExecutor {
private static final String FLINK_RUNNER_CONTEXT_SWITCH_ACTION_CONTEXT =
"flink_runner_context.flink_runner_context_switch_action_context";
+ private static final String CLOSE_FLINK_RUNNER_CONTEXT =
+ "flink_runner_context.close_flink_runner_context";
+
// ========== ASYNC THREAD POOL ===========
private static final String CREATE_ASYNC_THREAD_POOL =
"flink_runner_context.create_async_thread_pool";
@@ -176,6 +179,14 @@ public class PythonActionExecutor {
if (pythonAsyncThreadPool != null) {
interpreter.invoke(CLOSE_ASYNC_THREAD_POOL,
pythonAsyncThreadPool);
}
+
+ if (pythonRunnerContext != null) {
+ try {
+ interpreter.invoke(CLOSE_FLINK_RUNNER_CONTEXT,
pythonRunnerContext);
+ } finally {
+ pythonRunnerContext = null;
+ }
+ }
}
}