This is an automated email from the ASF dual-hosted git repository. xtsong pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-agents.git
commit e0801a0657b7c0f6a36eb08bc9031791f1d61860 Author: WenjinXie <[email protected]> AuthorDate: Sat Jan 17 19:53:07 2026 +0800 [runtime] Built-in actions support durable execution. --- .../flink/agents/plan/actions/ChatModelAction.java | 44 +++++++++++----------- .../plan/actions/ContextRetrievalAction.java | 39 +++++++++---------- .../flink/agents/plan/actions/ToolCallAction.java | 41 ++++++++++---------- .../flink_agents/plan/actions/chat_model_action.py | 5 ++- .../plan/actions/context_retrieval_action.py | 5 ++- .../flink_agents/plan/actions/tool_call_action.py | 2 +- 6 files changed, 67 insertions(+), 69 deletions(-) diff --git a/plan/src/main/java/org/apache/flink/agents/plan/actions/ChatModelAction.java b/plan/src/main/java/org/apache/flink/agents/plan/actions/ChatModelAction.java index ff5f6b9c..cc73f2c6 100644 --- a/plan/src/main/java/org/apache/flink/agents/plan/actions/ChatModelAction.java +++ b/plan/src/main/java/org/apache/flink/agents/plan/actions/ChatModelAction.java @@ -215,30 +215,30 @@ public class ChatModelAction { ChatMessage response = null; + DurableCallable<ChatMessage> callable = + new DurableCallable<>() { + @Override + public String getId() { + return "chat"; + } + + @Override + public Class<ChatMessage> getResultClass() { + return ChatMessage.class; + } + + @Override + public ChatMessage call() throws Exception { + return chatModel.chat(messages, Map.of()); + } + }; + for (int attempt = 0; attempt < numRetries + 1; attempt++) { try { - if (chatAsync) { - response = - ctx.durableExecuteAsync( - new DurableCallable<>() { - @Override - public String getId() { - return "chat-async"; - } - - @Override - public Class<ChatMessage> getResultClass() { - return ChatMessage.class; - } - - @Override - public ChatMessage call() throws Exception { - return chatModel.chat(messages, Map.of()); - } - }); - } else { - response = chatModel.chat(messages, Map.of()); - } + response = + chatAsync + ? ctx.durableExecuteAsync(callable) + : ctx.durableExecute(callable); // only generate structured output for final response. if (outputSchema != null && response.getToolCalls().isEmpty()) { response = generateStructuredOutput(response, outputSchema); diff --git a/plan/src/main/java/org/apache/flink/agents/plan/actions/ContextRetrievalAction.java b/plan/src/main/java/org/apache/flink/agents/plan/actions/ContextRetrievalAction.java index 1320c93c..420ceafa 100644 --- a/plan/src/main/java/org/apache/flink/agents/plan/actions/ContextRetrievalAction.java +++ b/plan/src/main/java/org/apache/flink/agents/plan/actions/ContextRetrievalAction.java @@ -69,29 +69,26 @@ public class ContextRetrievalAction { contextRetrievalRequestEvent.getQuery(), contextRetrievalRequestEvent.getMaxResults()); - VectorStoreQueryResult result; - if (ragAsync) { - result = - ctx.durableExecuteAsync( - new DurableCallable<VectorStoreQueryResult>() { - @Override - public String getId() { - return "rag-async"; - } + DurableCallable<VectorStoreQueryResult> callable = + new DurableCallable<VectorStoreQueryResult>() { + @Override + public String getId() { + return "rag-async"; + } - @Override - public Class<VectorStoreQueryResult> getResultClass() { - return VectorStoreQueryResult.class; - } + @Override + public Class<VectorStoreQueryResult> getResultClass() { + return VectorStoreQueryResult.class; + } - @Override - public VectorStoreQueryResult call() throws Exception { - return vectorStore.query(vectorStoreQuery); - } - }); - } else { - result = vectorStore.query(vectorStoreQuery); - } + @Override + public VectorStoreQueryResult call() throws Exception { + return vectorStore.query(vectorStoreQuery); + } + }; + + VectorStoreQueryResult result = + ragAsync ? ctx.durableExecuteAsync(callable) : ctx.durableExecute(callable); ctx.sendEvent( new ContextRetrievalResponseEvent( diff --git a/plan/src/main/java/org/apache/flink/agents/plan/actions/ToolCallAction.java b/plan/src/main/java/org/apache/flink/agents/plan/actions/ToolCallAction.java index f5fba2da..7a3eff2e 100644 --- a/plan/src/main/java/org/apache/flink/agents/plan/actions/ToolCallAction.java +++ b/plan/src/main/java/org/apache/flink/agents/plan/actions/ToolCallAction.java @@ -75,29 +75,28 @@ public class ToolCallAction { if (tool != null) { try { ToolResponse response; - if (toolCallAsync) { - final Tool toolRef = tool; - response = - ctx.durableExecuteAsync( - new DurableCallable<>() { - @Override - public String getId() { - return "tool-call-async"; - } + final Tool toolRef = tool; + DurableCallable<ToolResponse> callable = + new DurableCallable<>() { + @Override + public String getId() { + return "tool-call"; + } - @Override - public Class<ToolResponse> getResultClass() { - return ToolResponse.class; - } + @Override + public Class<ToolResponse> getResultClass() { + return ToolResponse.class; + } - @Override - public ToolResponse call() throws Exception { - return toolRef.call(new ToolParameters(arguments)); - } - }); - } else { - response = tool.call(new ToolParameters(arguments)); - } + @Override + public ToolResponse call() throws Exception { + return toolRef.call(new ToolParameters(arguments)); + } + }; + response = + toolCallAsync + ? ctx.durableExecuteAsync(callable) + : ctx.durableExecute(callable); success.put(id, true); responses.put(id, response); } catch (Exception e) { diff --git a/python/flink_agents/plan/actions/chat_model_action.py b/python/flink_agents/plan/actions/chat_model_action.py index 88806a2e..ffef800e 100644 --- a/python/flink_agents/plan/actions/chat_model_action.py +++ b/python/flink_agents/plan/actions/chat_model_action.py @@ -180,7 +180,8 @@ async def chat( ) chat_async = ctx.config.get(AgentExecutionOptions.CHAT_ASYNC) - # java chat model doesn't support async execution. + # java chat model doesn't support async execution, + # see https://github.com/apache/flink-agents/issues/448 for details. chat_async = chat_async and not isinstance(chat_model, JavaChatModelSetup) error_handling_strategy = ctx.config.get(AgentConfigOptions.ERROR_HANDLING_STRATEGY) @@ -194,7 +195,7 @@ async def chat( if chat_async: response = await ctx.durable_execute_async(chat_model.chat, messages) else: - response = chat_model.chat(messages) + response = ctx.durable_execute(chat_model.chat, messages) if response.extra_args.get("model_name") and response.extra_args.get("promptTokens") and response.extra_args.get("completionTokens"): chat_model._record_token_metrics(response.extra_args["model_name"], response.extra_args["promptTokens"], response.extra_args["completionTokens"]) diff --git a/python/flink_agents/plan/actions/context_retrieval_action.py b/python/flink_agents/plan/actions/context_retrieval_action.py index c862996e..d484e782 100644 --- a/python/flink_agents/plan/actions/context_retrieval_action.py +++ b/python/flink_agents/plan/actions/context_retrieval_action.py @@ -40,7 +40,8 @@ async def process_context_retrieval_request(event: Event, ctx: RunnerContext) -> query = VectorStoreQuery(query_text=event.query, limit=event.max_results) rag_async = ctx.config.get(AgentExecutionOptions.RAG_ASYNC) - # java vector store doesn't support async execution. + # java vector store doesn't support async execution + # see https://github.com/apache/flink-agents/issues/448 for details. rag_async = rag_async and not isinstance(vector_store, JavaVectorStore) if rag_async: # To avoid https://github.com/alibaba/pemja/issues/88, @@ -48,7 +49,7 @@ async def process_context_retrieval_request(event: Event, ctx: RunnerContext) -> _logger.debug("Processing context retrieval asynchronously.") result = await ctx.durable_execute_async(vector_store.query, query) else: - result = vector_store.query(query) + result = ctx.durable_execute(vector_store.query, query) ctx.send_event( ContextRetrievalResponseEvent( diff --git a/python/flink_agents/plan/actions/tool_call_action.py b/python/flink_agents/plan/actions/tool_call_action.py index fe69f3cd..6fbefaee 100644 --- a/python/flink_agents/plan/actions/tool_call_action.py +++ b/python/flink_agents/plan/actions/tool_call_action.py @@ -48,7 +48,7 @@ async def process_tool_request(event: ToolRequestEvent, ctx: RunnerContext) -> N if tool_call_async: response = await ctx.durable_execute_async(tool.call, **kwargs) else: - response = tool.call(**kwargs) + response = ctx.durable_execute(tool.call, **kwargs) responses[id] = response external_ids[id] = external_id ctx.send_event(
