This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-agents.git

commit e0801a0657b7c0f6a36eb08bc9031791f1d61860
Author: WenjinXie <[email protected]>
AuthorDate: Sat Jan 17 19:53:07 2026 +0800

    [runtime] Built-in actions support durable execution.
---
 .../flink/agents/plan/actions/ChatModelAction.java | 44 +++++++++++-----------
 .../plan/actions/ContextRetrievalAction.java       | 39 +++++++++----------
 .../flink/agents/plan/actions/ToolCallAction.java  | 41 ++++++++++----------
 .../flink_agents/plan/actions/chat_model_action.py |  5 ++-
 .../plan/actions/context_retrieval_action.py       |  5 ++-
 .../flink_agents/plan/actions/tool_call_action.py  |  2 +-
 6 files changed, 67 insertions(+), 69 deletions(-)

diff --git 
a/plan/src/main/java/org/apache/flink/agents/plan/actions/ChatModelAction.java 
b/plan/src/main/java/org/apache/flink/agents/plan/actions/ChatModelAction.java
index ff5f6b9c..cc73f2c6 100644
--- 
a/plan/src/main/java/org/apache/flink/agents/plan/actions/ChatModelAction.java
+++ 
b/plan/src/main/java/org/apache/flink/agents/plan/actions/ChatModelAction.java
@@ -215,30 +215,30 @@ public class ChatModelAction {
 
         ChatMessage response = null;
 
+        DurableCallable<ChatMessage> callable =
+                new DurableCallable<>() {
+                    @Override
+                    public String getId() {
+                        return "chat";
+                    }
+
+                    @Override
+                    public Class<ChatMessage> getResultClass() {
+                        return ChatMessage.class;
+                    }
+
+                    @Override
+                    public ChatMessage call() throws Exception {
+                        return chatModel.chat(messages, Map.of());
+                    }
+                };
+
         for (int attempt = 0; attempt < numRetries + 1; attempt++) {
             try {
-                if (chatAsync) {
-                    response =
-                            ctx.durableExecuteAsync(
-                                    new DurableCallable<>() {
-                                        @Override
-                                        public String getId() {
-                                            return "chat-async";
-                                        }
-
-                                        @Override
-                                        public Class<ChatMessage> 
getResultClass() {
-                                            return ChatMessage.class;
-                                        }
-
-                                        @Override
-                                        public ChatMessage call() throws 
Exception {
-                                            return chatModel.chat(messages, 
Map.of());
-                                        }
-                                    });
-                } else {
-                    response = chatModel.chat(messages, Map.of());
-                }
+                response =
+                        chatAsync
+                                ? ctx.durableExecuteAsync(callable)
+                                : ctx.durableExecute(callable);
                 // only generate structured output for final response.
                 if (outputSchema != null && response.getToolCalls().isEmpty()) 
{
                     response = generateStructuredOutput(response, 
outputSchema);
diff --git 
a/plan/src/main/java/org/apache/flink/agents/plan/actions/ContextRetrievalAction.java
 
b/plan/src/main/java/org/apache/flink/agents/plan/actions/ContextRetrievalAction.java
index 1320c93c..420ceafa 100644
--- 
a/plan/src/main/java/org/apache/flink/agents/plan/actions/ContextRetrievalAction.java
+++ 
b/plan/src/main/java/org/apache/flink/agents/plan/actions/ContextRetrievalAction.java
@@ -69,29 +69,26 @@ public class ContextRetrievalAction {
                             contextRetrievalRequestEvent.getQuery(),
                             contextRetrievalRequestEvent.getMaxResults());
 
-            VectorStoreQueryResult result;
-            if (ragAsync) {
-                result =
-                        ctx.durableExecuteAsync(
-                                new DurableCallable<VectorStoreQueryResult>() {
-                                    @Override
-                                    public String getId() {
-                                        return "rag-async";
-                                    }
+            DurableCallable<VectorStoreQueryResult> callable =
+                    new DurableCallable<VectorStoreQueryResult>() {
+                        @Override
+                        public String getId() {
+                            return "rag-async";
+                        }
 
-                                    @Override
-                                    public Class<VectorStoreQueryResult> 
getResultClass() {
-                                        return VectorStoreQueryResult.class;
-                                    }
+                        @Override
+                        public Class<VectorStoreQueryResult> getResultClass() {
+                            return VectorStoreQueryResult.class;
+                        }
 
-                                    @Override
-                                    public VectorStoreQueryResult call() 
throws Exception {
-                                        return 
vectorStore.query(vectorStoreQuery);
-                                    }
-                                });
-            } else {
-                result = vectorStore.query(vectorStoreQuery);
-            }
+                        @Override
+                        public VectorStoreQueryResult call() throws Exception {
+                            return vectorStore.query(vectorStoreQuery);
+                        }
+                    };
+
+            VectorStoreQueryResult result =
+                    ragAsync ? ctx.durableExecuteAsync(callable) : 
ctx.durableExecute(callable);
 
             ctx.sendEvent(
                     new ContextRetrievalResponseEvent(
diff --git 
a/plan/src/main/java/org/apache/flink/agents/plan/actions/ToolCallAction.java 
b/plan/src/main/java/org/apache/flink/agents/plan/actions/ToolCallAction.java
index f5fba2da..7a3eff2e 100644
--- 
a/plan/src/main/java/org/apache/flink/agents/plan/actions/ToolCallAction.java
+++ 
b/plan/src/main/java/org/apache/flink/agents/plan/actions/ToolCallAction.java
@@ -75,29 +75,28 @@ public class ToolCallAction {
             if (tool != null) {
                 try {
                     ToolResponse response;
-                    if (toolCallAsync) {
-                        final Tool toolRef = tool;
-                        response =
-                                ctx.durableExecuteAsync(
-                                        new DurableCallable<>() {
-                                            @Override
-                                            public String getId() {
-                                                return "tool-call-async";
-                                            }
+                    final Tool toolRef = tool;
+                    DurableCallable<ToolResponse> callable =
+                            new DurableCallable<>() {
+                                @Override
+                                public String getId() {
+                                    return "tool-call";
+                                }
 
-                                            @Override
-                                            public Class<ToolResponse> 
getResultClass() {
-                                                return ToolResponse.class;
-                                            }
+                                @Override
+                                public Class<ToolResponse> getResultClass() {
+                                    return ToolResponse.class;
+                                }
 
-                                            @Override
-                                            public ToolResponse call() throws 
Exception {
-                                                return toolRef.call(new 
ToolParameters(arguments));
-                                            }
-                                        });
-                    } else {
-                        response = tool.call(new ToolParameters(arguments));
-                    }
+                                @Override
+                                public ToolResponse call() throws Exception {
+                                    return toolRef.call(new 
ToolParameters(arguments));
+                                }
+                            };
+                    response =
+                            toolCallAsync
+                                    ? ctx.durableExecuteAsync(callable)
+                                    : ctx.durableExecute(callable);
                     success.put(id, true);
                     responses.put(id, response);
                 } catch (Exception e) {
diff --git a/python/flink_agents/plan/actions/chat_model_action.py 
b/python/flink_agents/plan/actions/chat_model_action.py
index 88806a2e..ffef800e 100644
--- a/python/flink_agents/plan/actions/chat_model_action.py
+++ b/python/flink_agents/plan/actions/chat_model_action.py
@@ -180,7 +180,8 @@ async def chat(
     )
 
     chat_async = ctx.config.get(AgentExecutionOptions.CHAT_ASYNC)
-    # java chat model doesn't support async execution.
+    # java chat model doesn't support async execution,
+    # see https://github.com/apache/flink-agents/issues/448 for details.
     chat_async = chat_async and not isinstance(chat_model, JavaChatModelSetup)
 
     error_handling_strategy = 
ctx.config.get(AgentConfigOptions.ERROR_HANDLING_STRATEGY)
@@ -194,7 +195,7 @@ async def chat(
             if chat_async:
                 response = await ctx.durable_execute_async(chat_model.chat, 
messages)
             else:
-                response = chat_model.chat(messages)
+                response = ctx.durable_execute(chat_model.chat, messages)
 
             if response.extra_args.get("model_name") and 
response.extra_args.get("promptTokens") and 
response.extra_args.get("completionTokens"):
                 
chat_model._record_token_metrics(response.extra_args["model_name"], 
response.extra_args["promptTokens"], response.extra_args["completionTokens"])
diff --git a/python/flink_agents/plan/actions/context_retrieval_action.py 
b/python/flink_agents/plan/actions/context_retrieval_action.py
index c862996e..d484e782 100644
--- a/python/flink_agents/plan/actions/context_retrieval_action.py
+++ b/python/flink_agents/plan/actions/context_retrieval_action.py
@@ -40,7 +40,8 @@ async def process_context_retrieval_request(event: Event, 
ctx: RunnerContext) ->
         query = VectorStoreQuery(query_text=event.query, 
limit=event.max_results)
 
         rag_async = ctx.config.get(AgentExecutionOptions.RAG_ASYNC)
-        # java vector store doesn't support async execution.
+        # java vector store doesn't support async execution
+        # see https://github.com/apache/flink-agents/issues/448 for details.
         rag_async = rag_async and not isinstance(vector_store, JavaVectorStore)
         if rag_async:
             # To avoid https://github.com/alibaba/pemja/issues/88,
@@ -48,7 +49,7 @@ async def process_context_retrieval_request(event: Event, 
ctx: RunnerContext) ->
             _logger.debug("Processing context retrieval asynchronously.")
             result = await ctx.durable_execute_async(vector_store.query, query)
         else:
-            result = vector_store.query(query)
+            result = ctx.durable_execute(vector_store.query, query)
 
         ctx.send_event(
             ContextRetrievalResponseEvent(
diff --git a/python/flink_agents/plan/actions/tool_call_action.py 
b/python/flink_agents/plan/actions/tool_call_action.py
index fe69f3cd..6fbefaee 100644
--- a/python/flink_agents/plan/actions/tool_call_action.py
+++ b/python/flink_agents/plan/actions/tool_call_action.py
@@ -48,7 +48,7 @@ async def process_tool_request(event: ToolRequestEvent, ctx: 
RunnerContext) -> N
             if tool_call_async:
                 response = await ctx.durable_execute_async(tool.call, **kwargs)
             else:
-                response = tool.call(**kwargs)
+                response = ctx.durable_execute(tool.call, **kwargs)
         responses[id] = response
         external_ids[id] = external_id
     ctx.send_event(

Reply via email to