This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-agents.git


The following commit(s) were added to refs/heads/main by this push:
     new 29e64149 [Feature][Java] Add OpenAI Responses API integration (#556)
29e64149 is described below

commit 29e641497bd5ae32726215b788f9bccec9a09daa
Author: Adesh Nalpet Adimurthy <[email protected]>
AuthorDate: Fri Mar 13 05:40:11 2026 -0400

    [Feature][Java] Add OpenAI Responses API integration (#556)
---
 .../flink/agents/api/resource/ResourceName.java    |  18 +-
 .../test/ChatModelIntegrationAgent.java            |  20 +-
 .../integration/test/ChatModelIntegrationTest.java |   2 +-
 ...ction.java => OpenAICompletionsConnection.java} |   6 +-
 ...ModelSetup.java => OpenAICompletionsSetup.java} |  12 +-
 ...on.java => OpenAIResponsesModelConnection.java} | 383 +++++++++++----------
 ...elSetup.java => OpenAIResponsesModelSetup.java} |  93 +++--
 python/flink_agents/api/resource.py                |   9 +-
 8 files changed, 283 insertions(+), 260 deletions(-)

diff --git 
a/api/src/main/java/org/apache/flink/agents/api/resource/ResourceName.java 
b/api/src/main/java/org/apache/flink/agents/api/resource/ResourceName.java
index 399c93b2..1cbbfdf6 100644
--- a/api/src/main/java/org/apache/flink/agents/api/resource/ResourceName.java
+++ b/api/src/main/java/org/apache/flink/agents/api/resource/ResourceName.java
@@ -66,10 +66,16 @@ public final class ResourceName {
                 
"org.apache.flink.agents.integrations.chatmodels.ollama.OllamaChatModelSetup";
 
         // OpenAI
-        public static final String OPENAI_CONNECTION =
-                
"org.apache.flink.agents.integrations.chatmodels.openai.OpenAIChatModelConnection";
-        public static final String OPENAI_SETUP =
-                
"org.apache.flink.agents.integrations.chatmodels.openai.OpenAIChatModelSetup";
+        public static final String OPENAI_COMPLETIONS_CONNECTION =
+                
"org.apache.flink.agents.integrations.chatmodels.openai.OpenAICompletionsConnection";
+        public static final String OPENAI_COMPLETIONS_SETUP =
+                
"org.apache.flink.agents.integrations.chatmodels.openai.OpenAICompletionsSetup";
+
+        // OpenAI Responses API
+        public static final String OPENAI_RESPONSES_CONNECTION =
+                
"org.apache.flink.agents.integrations.chatmodels.openai.OpenAIResponsesModelConnection";
+        public static final String OPENAI_RESPONSES_SETUP =
+                
"org.apache.flink.agents.integrations.chatmodels.openai.OpenAIResponsesModelSetup";
 
         // Python Wrapper
         public static final String PYTHON_WRAPPER_CONNECTION =
@@ -99,9 +105,9 @@ public final class ResourceName {
                     
"flink_agents.integrations.chat_models.ollama_chat_model.OllamaChatModelSetup";
 
             // OpenAI
-            public static final String OPENAI_CONNECTION =
+            public static final String OPENAI_COMPLETIONS_CONNECTION =
                     
"flink_agents.integrations.chat_models.openai.openai_chat_model.OpenAIChatModelConnection";
-            public static final String OPENAI_SETUP =
+            public static final String OPENAI_COMPLETIONS_SETUP =
                     
"flink_agents.integrations.chat_models.openai.openai_chat_model.OpenAIChatModelSetup";
 
             // Tongyi
diff --git 
a/e2e-test/flink-agents-end-to-end-tests-integration/src/test/java/org/apache/flink/agents/integration/test/ChatModelIntegrationAgent.java
 
b/e2e-test/flink-agents-end-to-end-tests-integration/src/test/java/org/apache/flink/agents/integration/test/ChatModelIntegrationAgent.java
index 99977169..e3b10abb 100644
--- 
a/e2e-test/flink-agents-end-to-end-tests-integration/src/test/java/org/apache/flink/agents/integration/test/ChatModelIntegrationAgent.java
+++ 
b/e2e-test/flink-agents-end-to-end-tests-integration/src/test/java/org/apache/flink/agents/integration/test/ChatModelIntegrationAgent.java
@@ -81,9 +81,15 @@ public class ChatModelIntegrationAgent extends Agent {
                     .build();
         } else if (provider.equals("OPENAI")) {
             String apiKey = System.getenv().get("OPENAI_API_KEY");
-            return 
ResourceDescriptor.Builder.newBuilder(ResourceName.ChatModel.OPENAI_CONNECTION)
+            return ResourceDescriptor.Builder.newBuilder(
+                            
ResourceName.ChatModel.OPENAI_COMPLETIONS_CONNECTION)
                     .addInitialArgument("api_key", apiKey)
                     .build();
+        } else if (provider.equals("OPENAI_RESPONSES")) {
+            return ResourceDescriptor.Builder.newBuilder(
+                            ResourceName.ChatModel.OPENAI_RESPONSES_CONNECTION)
+                    .addInitialArgument("api_key", 
System.getenv().get("OPENAI_API_KEY"))
+                    .build();
         } else if (provider.equals("ANTHROPIC")) {
             String apiKey = System.getenv().get("ANTHROPIC_API_KEY");
             return ResourceDescriptor.Builder.newBuilder(
@@ -126,7 +132,17 @@ public class ChatModelIntegrationAgent extends Agent {
                             List.of("calculateBMI", "convertTemperature", 
"createRandomNumber"))
                     .build();
         } else if (provider.equals("OPENAI")) {
-            return 
ResourceDescriptor.Builder.newBuilder(ResourceName.ChatModel.OPENAI_SETUP)
+            return ResourceDescriptor.Builder.newBuilder(
+                            ResourceName.ChatModel.OPENAI_COMPLETIONS_SETUP)
+                    .addInitialArgument("connection", "chatModelConnection")
+                    .addInitialArgument("model", "gpt-4o-mini")
+                    .addInitialArgument(
+                            "tools",
+                            List.of("calculateBMI", "convertTemperature", 
"createRandomNumber"))
+                    .build();
+        } else if (provider.equals("OPENAI_RESPONSES")) {
+            return ResourceDescriptor.Builder.newBuilder(
+                            ResourceName.ChatModel.OPENAI_RESPONSES_SETUP)
                     .addInitialArgument("connection", "chatModelConnection")
                     .addInitialArgument("model", "gpt-4o-mini")
                     .addInitialArgument(
diff --git 
a/e2e-test/flink-agents-end-to-end-tests-integration/src/test/java/org/apache/flink/agents/integration/test/ChatModelIntegrationTest.java
 
b/e2e-test/flink-agents-end-to-end-tests-integration/src/test/java/org/apache/flink/agents/integration/test/ChatModelIntegrationTest.java
index c843b970..75a3d5c1 100644
--- 
a/e2e-test/flink-agents-end-to-end-tests-integration/src/test/java/org/apache/flink/agents/integration/test/ChatModelIntegrationTest.java
+++ 
b/e2e-test/flink-agents-end-to-end-tests-integration/src/test/java/org/apache/flink/agents/integration/test/ChatModelIntegrationTest.java
@@ -53,7 +53,7 @@ public class ChatModelIntegrationTest extends 
OllamaPreparationUtils {
     }
 
     @ParameterizedTest()
-    @ValueSource(strings = {"ANTHROPIC", "AZURE", "OLLAMA", "OPENAI"})
+    @ValueSource(strings = {"ANTHROPIC", "AZURE", "OLLAMA", "OPENAI", 
"OPENAI_RESPONSES"})
     public void testChatModeIntegration(String provider) throws Exception {
         Assumptions.assumeTrue(
                 (OLLAMA.equals(provider) && ollamaReady)
diff --git 
a/integrations/chat-models/openai/src/main/java/org/apache/flink/agents/integrations/chatmodels/openai/OpenAIChatModelConnection.java
 
b/integrations/chat-models/openai/src/main/java/org/apache/flink/agents/integrations/chatmodels/openai/OpenAICompletionsConnection.java
similarity index 98%
copy from 
integrations/chat-models/openai/src/main/java/org/apache/flink/agents/integrations/chatmodels/openai/OpenAIChatModelConnection.java
copy to 
integrations/chat-models/openai/src/main/java/org/apache/flink/agents/integrations/chatmodels/openai/OpenAICompletionsConnection.java
index b04cd2b2..6ed1b170 100644
--- 
a/integrations/chat-models/openai/src/main/java/org/apache/flink/agents/integrations/chatmodels/openai/OpenAIChatModelConnection.java
+++ 
b/integrations/chat-models/openai/src/main/java/org/apache/flink/agents/integrations/chatmodels/openai/OpenAICompletionsConnection.java
@@ -80,7 +80,7 @@ import java.util.stream.Collectors;
  * public class MyAgent extends Agent {
  *   @ChatModelConnection
  *   public static ResourceDesc openAI() {
- *     return 
ResourceDescriptor.Builder.newBuilder(OpenAIChatModelConnection.class.getName())
+ *     return 
ResourceDescriptor.Builder.newBuilder(OpenAICompletionsConnection.class.getName())
  *             .addInitialArgument("api_key", System.getenv("OPENAI_API_KEY"))
  *             .addInitialArgument("api_base_url", "https://api.openai.com/v1";)
  *             .addInitialArgument("timeout", 120)
@@ -91,7 +91,7 @@ import java.util.stream.Collectors;
  * }
  * }</pre>
  */
-public class OpenAIChatModelConnection extends BaseChatModelConnection {
+public class OpenAICompletionsConnection extends BaseChatModelConnection {
 
     private static final TypeReference<Map<String, Object>> MAP_TYPE = new 
TypeReference<>() {};
 
@@ -99,7 +99,7 @@ public class OpenAIChatModelConnection extends 
BaseChatModelConnection {
     private final OpenAIClient client;
     private final String defaultModel;
 
-    public OpenAIChatModelConnection(
+    public OpenAICompletionsConnection(
             ResourceDescriptor descriptor, BiFunction<String, ResourceType, 
Resource> getResource) {
         super(descriptor, getResource);
 
diff --git 
a/integrations/chat-models/openai/src/main/java/org/apache/flink/agents/integrations/chatmodels/openai/OpenAIChatModelSetup.java
 
b/integrations/chat-models/openai/src/main/java/org/apache/flink/agents/integrations/chatmodels/openai/OpenAICompletionsSetup.java
similarity index 95%
copy from 
integrations/chat-models/openai/src/main/java/org/apache/flink/agents/integrations/chatmodels/openai/OpenAIChatModelSetup.java
copy to 
integrations/chat-models/openai/src/main/java/org/apache/flink/agents/integrations/chatmodels/openai/OpenAICompletionsSetup.java
index 270dcb93..13a32ac2 100644
--- 
a/integrations/chat-models/openai/src/main/java/org/apache/flink/agents/integrations/chatmodels/openai/OpenAIChatModelSetup.java
+++ 
b/integrations/chat-models/openai/src/main/java/org/apache/flink/agents/integrations/chatmodels/openai/OpenAICompletionsSetup.java
@@ -34,7 +34,7 @@ import java.util.function.BiFunction;
  *
  * <p>Responsible for providing per-chat configuration such as model, 
temperature, tool bindings,
  * and additional OpenAI parameters. The setup delegates execution to {@link
- * OpenAIChatModelConnection}.
+ * OpenAICompletionsConnection}.
  *
  * <p>Example usage:
  *
@@ -42,7 +42,7 @@ import java.util.function.BiFunction;
  * public class MyAgent extends Agent {
  *   @ChatModelSetup
  *   public static ResourceDesc openAI() {
- *     return 
ResourceDescriptor.Builder.newBuilder(OpenAIChatModelSetup.class.getName())
+ *     return 
ResourceDescriptor.Builder.newBuilder(OpenAICompletionsSetup.class.getName())
  *             .addInitialArgument("connection", "myOpenAIConnection")
  *             .addInitialArgument("model", "gpt-4o-mini")
  *             .addInitialArgument("temperature", 0.3d)
@@ -58,7 +58,7 @@ import java.util.function.BiFunction;
  * }
  * }</pre>
  */
-public class OpenAIChatModelSetup extends BaseChatModelSetup {
+public class OpenAICompletionsSetup extends BaseChatModelSetup {
 
     private static final String DEFAULT_MODEL = "gpt-3.5-turbo";
     private static final double DEFAULT_TEMPERATURE = 0.1d;
@@ -74,7 +74,7 @@ public class OpenAIChatModelSetup extends BaseChatModelSetup {
     private final String reasoningEffort;
     private final Map<String, Object> additionalArguments;
 
-    public OpenAIChatModelSetup(
+    public OpenAICompletionsSetup(
             ResourceDescriptor descriptor, BiFunction<String, ResourceType, 
Resource> getResource) {
         super(descriptor, getResource);
         this.temperature =
@@ -126,7 +126,7 @@ public class OpenAIChatModelSetup extends 
BaseChatModelSetup {
         }
     }
 
-    public OpenAIChatModelSetup(
+    public OpenAICompletionsSetup(
             String model,
             double temperature,
             Integer maxTokens,
@@ -188,7 +188,7 @@ public class OpenAIChatModelSetup extends 
BaseChatModelSetup {
             Map<String, Object> additionalArguments,
             List<String> tools) {
         ResourceDescriptor.Builder builder =
-                
ResourceDescriptor.Builder.newBuilder(OpenAIChatModelSetup.class.getName())
+                
ResourceDescriptor.Builder.newBuilder(OpenAICompletionsSetup.class.getName())
                         .addInitialArgument("model", model)
                         .addInitialArgument("temperature", temperature);
 
diff --git 
a/integrations/chat-models/openai/src/main/java/org/apache/flink/agents/integrations/chatmodels/openai/OpenAIChatModelConnection.java
 
b/integrations/chat-models/openai/src/main/java/org/apache/flink/agents/integrations/chatmodels/openai/OpenAIResponsesModelConnection.java
similarity index 52%
rename from 
integrations/chat-models/openai/src/main/java/org/apache/flink/agents/integrations/chatmodels/openai/OpenAIChatModelConnection.java
rename to 
integrations/chat-models/openai/src/main/java/org/apache/flink/agents/integrations/chatmodels/openai/OpenAIResponsesModelConnection.java
index b04cd2b2..f185d65f 100644
--- 
a/integrations/chat-models/openai/src/main/java/org/apache/flink/agents/integrations/chatmodels/openai/OpenAIChatModelConnection.java
+++ 
b/integrations/chat-models/openai/src/main/java/org/apache/flink/agents/integrations/chatmodels/openai/OpenAIResponsesModelConnection.java
@@ -25,28 +25,15 @@ import com.openai.client.OpenAIClient;
 import com.openai.client.okhttp.OpenAIOkHttpClient;
 import com.openai.core.JsonValue;
 import com.openai.models.ChatModel;
-import com.openai.models.FunctionDefinition;
-import com.openai.models.FunctionParameters;
+import com.openai.models.Reasoning;
 import com.openai.models.ReasoningEffort;
-import com.openai.models.chat.completions.ChatCompletion;
-import com.openai.models.chat.completions.ChatCompletionAssistantMessageParam;
-import com.openai.models.chat.completions.ChatCompletionCreateParams;
-import com.openai.models.chat.completions.ChatCompletionFunctionTool;
-import com.openai.models.chat.completions.ChatCompletionMessage;
-import 
com.openai.models.chat.completions.ChatCompletionMessageFunctionToolCall;
-import com.openai.models.chat.completions.ChatCompletionMessageParam;
-import com.openai.models.chat.completions.ChatCompletionMessageToolCall;
-import com.openai.models.chat.completions.ChatCompletionSystemMessageParam;
-import com.openai.models.chat.completions.ChatCompletionTool;
-import com.openai.models.chat.completions.ChatCompletionToolMessageParam;
-import com.openai.models.chat.completions.ChatCompletionUserMessageParam;
+import com.openai.models.responses.*;
 import org.apache.flink.agents.api.chat.messages.ChatMessage;
 import org.apache.flink.agents.api.chat.messages.MessageRole;
 import org.apache.flink.agents.api.chat.model.BaseChatModelConnection;
 import org.apache.flink.agents.api.resource.Resource;
 import org.apache.flink.agents.api.resource.ResourceDescriptor;
 import org.apache.flink.agents.api.resource.ResourceType;
-import org.apache.flink.agents.api.tools.Tool;
 import org.apache.flink.agents.api.tools.ToolMetadata;
 
 import java.time.Duration;
@@ -57,17 +44,22 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.function.BiFunction;
-import java.util.stream.Collectors;
 
 /**
- * A chat model integration for the OpenAI Chat Completions service using the 
official Java SDK.
+ * A <b>dedicated</b> OpenAI chat model integration using the Responses API.
+ *
+ * <p>Unlike {@link OpenAICompletionsConnection} which uses the Chat 
Completions API and works with
+ * any OpenAI-compatible provider (DeepSeek, DashScope, etc.), this 
implementation uses OpenAI's
+ * Responses API which is specific to OpenAI.
+ *
+ * <p>For OpenAI-compatible providers that only support the Chat Completions 
API, use {@link
+ * OpenAICompletionsConnection} instead.
  *
  * <p>Supported connection parameters:
  *
  * <ul>
  *   <li><b>api_key</b> (required): OpenAI API key
- *   <li><b>api_base_url</b> (optional): Base URL for OpenAI API (defaults to
- *       https://api.openai.com/v1)
+ *   <li><b>api_base_url</b> (optional): Base URL for OpenAI API (useful for 
proxies)
  *   <li><b>timeout</b> (optional): Timeout in seconds for API requests
  *   <li><b>max_retries</b> (optional): Maximum number of retry attempts 
(default: 2)
  *   <li><b>default_headers</b> (optional): Map of default headers to include 
in all requests
@@ -79,27 +71,25 @@ import java.util.stream.Collectors;
  * <pre>{@code
  * public class MyAgent extends Agent {
  *   @ChatModelConnection
- *   public static ResourceDesc openAI() {
- *     return 
ResourceDescriptor.Builder.newBuilder(OpenAIChatModelConnection.class.getName())
+ *   public static ResourceDesc openAIResponses() {
+ *     return 
ResourceDescriptor.Builder.newBuilder(OpenAIResponsesModelConnection.class.getName())
  *             .addInitialArgument("api_key", System.getenv("OPENAI_API_KEY"))
- *             .addInitialArgument("api_base_url", "https://api.openai.com/v1";)
  *             .addInitialArgument("timeout", 120)
  *             .addInitialArgument("max_retries", 3)
- *             .addInitialArgument("default_headers", 
Map.of("X-Custom-Header", "value"))
  *             .build();
  *   }
  * }
  * }</pre>
  */
-public class OpenAIChatModelConnection extends BaseChatModelConnection {
+public class OpenAIResponsesModelConnection extends BaseChatModelConnection {
 
     private static final TypeReference<Map<String, Object>> MAP_TYPE = new 
TypeReference<>() {};
+    private static final ObjectMapper mapper = new ObjectMapper();
 
-    private final ObjectMapper mapper = new ObjectMapper();
     private final OpenAIClient client;
     private final String defaultModel;
 
-    public OpenAIChatModelConnection(
+    public OpenAIResponsesModelConnection(
             ResourceDescriptor descriptor, BiFunction<String, ResourceType, 
Resource> getResource) {
         super(descriptor, getResource);
 
@@ -138,14 +128,15 @@ public class OpenAIChatModelConnection extends 
BaseChatModelConnection {
 
     @Override
     public ChatMessage chat(
-            List<ChatMessage> messages, List<Tool> tools, Map<String, Object> 
arguments) {
+            List<ChatMessage> messages,
+            List<org.apache.flink.agents.api.tools.Tool> tools,
+            Map<String, Object> arguments) {
         try {
-            ChatCompletionCreateParams params = buildRequest(messages, tools, 
arguments);
-            ChatCompletion completion = 
client.chat().completions().create(params);
-            ChatMessage response = convertResponse(completion);
+            ResponseCreateParams params = buildRequest(messages, tools, 
arguments);
+            Response response = client.responses().create(params);
+            ChatMessage result = convertResponse(response);
 
-            // Record token metrics
-            if (completion.usage().isPresent()) {
+            if (response.usage().isPresent()) {
                 String modelName = arguments != null ? (String) 
arguments.get("model") : null;
                 if (modelName == null || modelName.isBlank()) {
                     modelName = this.defaultModel;
@@ -153,19 +144,21 @@ public class OpenAIChatModelConnection extends 
BaseChatModelConnection {
                 if (modelName != null && !modelName.isBlank()) {
                     recordTokenMetrics(
                             modelName,
-                            completion.usage().get().promptTokens(),
-                            completion.usage().get().completionTokens());
+                            response.usage().get().inputTokens(),
+                            response.usage().get().outputTokens());
                 }
             }
 
-            return response;
+            return result;
         } catch (Exception e) {
-            throw new RuntimeException("Failed to call OpenAI chat completions 
API.", e);
+            throw new RuntimeException("Failed to call OpenAI Responses API.", 
e);
         }
     }
 
-    private ChatCompletionCreateParams buildRequest(
-            List<ChatMessage> messages, List<Tool> tools, Map<String, Object> 
rawArguments) {
+    private ResponseCreateParams buildRequest(
+            List<ChatMessage> messages,
+            List<org.apache.flink.agents.api.tools.Tool> tools,
+            Map<String, Object> rawArguments) {
         Map<String, Object> arguments =
                 rawArguments != null ? new HashMap<>(rawArguments) : new 
HashMap<>();
 
@@ -175,13 +168,12 @@ public class OpenAIChatModelConnection extends 
BaseChatModelConnection {
             modelName = this.defaultModel;
         }
 
-        ChatCompletionCreateParams.Builder builder =
-                ChatCompletionCreateParams.builder()
+        List<ResponseInputItem> inputItems = convertInputItems(messages);
+
+        ResponseCreateParams.Builder builder =
+                ResponseCreateParams.builder()
                         .model(ChatModel.of(modelName))
-                        .messages(
-                                messages.stream()
-                                        .map(this::convertToOpenAIMessage)
-                                        .collect(Collectors.toList()));
+                        .inputOfResponse(inputItems);
 
         if (tools != null && !tools.isEmpty()) {
             builder.tools(convertTools(tools, strictMode));
@@ -194,24 +186,25 @@ public class OpenAIChatModelConnection extends 
BaseChatModelConnection {
 
         Object maxTokens = arguments.remove("max_tokens");
         if (maxTokens instanceof Number) {
-            builder.maxCompletionTokens(((Number) maxTokens).longValue());
-        }
-
-        Object logprobs = arguments.remove("logprobs");
-        boolean logprobsEnabled = Boolean.TRUE.equals(logprobs);
-        if (logprobsEnabled) {
-            builder.logprobs(true);
-            Object topLogprobs = arguments.remove("top_logprobs");
-            if (topLogprobs instanceof Number) {
-                builder.topLogprobs(((Number) topLogprobs).longValue());
-            }
-        } else {
-            arguments.remove("top_logprobs");
+            builder.maxOutputTokens(((Number) maxTokens).longValue());
         }
 
         Object reasoningEffort = arguments.remove("reasoning_effort");
         if (reasoningEffort instanceof String) {
-            builder.reasoningEffort(ReasoningEffort.of((String) 
reasoningEffort));
+            builder.reasoning(
+                    Reasoning.builder()
+                            .effort(ReasoningEffort.of((String) 
reasoningEffort))
+                            .build());
+        }
+
+        Object store = arguments.remove("store");
+        if (Boolean.TRUE.equals(store)) {
+            builder.store(true);
+        }
+
+        Object instructions = arguments.remove("instructions");
+        if (instructions instanceof String) {
+            builder.instructions((String) instructions);
         }
 
         @SuppressWarnings("unchecked")
@@ -225,181 +218,191 @@ public class OpenAIChatModelConnection extends 
BaseChatModelConnection {
         return builder.build();
     }
 
-    private List<ChatCompletionTool> convertTools(List<Tool> tools, boolean 
strictMode) {
-        List<ChatCompletionTool> openaiTools = new ArrayList<>(tools.size());
-        for (Tool tool : tools) {
-            ToolMetadata metadata = tool.getMetadata();
-            FunctionDefinition.Builder functionBuilder =
-                    FunctionDefinition.builder()
-                            .name(metadata.getName())
-                            .description(metadata.getDescription());
-
-            String schema = metadata.getInputSchema();
-            if (schema != null && !schema.isBlank()) {
-                functionBuilder.parameters(parseFunctionParameters(schema));
-            }
-
-            if (strictMode) {
-                functionBuilder.strict(true);
-            }
-
-            ChatCompletionFunctionTool functionTool =
-                    ChatCompletionFunctionTool.builder()
-                            .function(functionBuilder.build())
-                            .type(JsonValue.from("function"))
-                            .build();
-
-            openaiTools.add(ChatCompletionTool.ofFunction(functionTool));
+    private List<ResponseInputItem> convertInputItems(List<ChatMessage> 
messages) {
+        List<ResponseInputItem> items = new ArrayList<>();
+        for (ChatMessage message : messages) {
+            items.addAll(convertSingleMessage(message));
         }
-        return openaiTools;
+        return items;
     }
 
-    private FunctionParameters parseFunctionParameters(String schemaJson) {
-        try {
-            JsonNode root = mapper.readTree(schemaJson);
-            if (root == null || !root.isObject()) {
-                return FunctionParameters.builder().build();
-            }
-
-            FunctionParameters.Builder builder = FunctionParameters.builder();
-            root.fields()
-                    .forEachRemaining(
-                            entry ->
-                                    builder.putAdditionalProperty(
-                                            entry.getKey(),
-                                            
JsonValue.fromJsonNode(entry.getValue())));
-            return builder.build();
-        } catch (JsonProcessingException e) {
-            throw new RuntimeException("Failed to parse tool schema JSON.", e);
-        }
-    }
-
-    private ChatCompletionMessageParam convertToOpenAIMessage(ChatMessage 
message) {
+    private List<ResponseInputItem> convertSingleMessage(ChatMessage message) {
+        List<ResponseInputItem> items = new ArrayList<>();
         MessageRole role = message.getRole();
         String content = Optional.ofNullable(message.getContent()).orElse("");
 
         switch (role) {
             case SYSTEM:
-                return ChatCompletionMessageParam.ofSystem(
-                        
ChatCompletionSystemMessageParam.builder().content(content).build());
+                items.add(
+                        ResponseInputItem.ofMessage(
+                                ResponseInputItem.Message.builder()
+                                        
.role(ResponseInputItem.Message.Role.SYSTEM)
+                                        .addInputTextContent(content)
+                                        .build()));
+                break;
+
             case USER:
-                return ChatCompletionMessageParam.ofUser(
-                        
ChatCompletionUserMessageParam.builder().content(content).build());
+                items.add(
+                        ResponseInputItem.ofMessage(
+                                ResponseInputItem.Message.builder()
+                                        
.role(ResponseInputItem.Message.Role.USER)
+                                        .addInputTextContent(content)
+                                        .build()));
+                break;
+
             case ASSISTANT:
-                ChatCompletionAssistantMessageParam.Builder assistantBuilder =
-                        ChatCompletionAssistantMessageParam.builder();
-                if (!content.isEmpty()) {
-                    assistantBuilder.content(content);
-                }
                 List<Map<String, Object>> toolCalls = message.getToolCalls();
                 if (toolCalls != null && !toolCalls.isEmpty()) {
-                    
assistantBuilder.toolCalls(convertAssistantToolCalls(toolCalls));
+                    for (Map<String, Object> call : toolCalls) {
+                        Map<String, Object> functionPayload = 
toMap(call.get("function"));
+                        String responseId = String.valueOf(call.get("id"));
+                        String callId = 
String.valueOf(call.get("original_id"));
+                        String name = 
String.valueOf(functionPayload.get("name"));
+                        String args = 
serializeArguments(functionPayload.get("arguments"));
+
+                        items.add(
+                                ResponseInputItem.ofFunctionCall(
+                                        ResponseFunctionToolCall.builder()
+                                                .id(responseId)
+                                                .callId(callId)
+                                                .name(name)
+                                                .arguments(args)
+                                                
.status(ResponseFunctionToolCall.Status.COMPLETED)
+                                                .build()));
+                    }
                 }
-                Object refusal = message.getExtraArgs().get("refusal");
-                if (refusal instanceof String) {
-                    assistantBuilder.refusal((String) refusal);
+                if (!content.isEmpty()) {
+                    items.add(
+                            ResponseInputItem.ofEasyInputMessage(
+                                    EasyInputMessage.builder()
+                                            
.role(EasyInputMessage.Role.ASSISTANT)
+                                            .content(content)
+                                            .build()));
                 }
-                return 
ChatCompletionMessageParam.ofAssistant(assistantBuilder.build());
+                break;
+
             case TOOL:
-                ChatCompletionToolMessageParam.Builder toolBuilder =
-                        
ChatCompletionToolMessageParam.builder().content(content);
                 Object toolCallId = message.getExtraArgs().get("externalId");
                 if (toolCallId == null) {
                     throw new IllegalArgumentException(
                             "Tool message must have an externalId in 
extraArgs.");
                 }
-                toolBuilder.toolCallId(toolCallId.toString());
-                return ChatCompletionMessageParam.ofTool(toolBuilder.build());
+                items.add(
+                        ResponseInputItem.ofFunctionCallOutput(
+                                ResponseInputItem.FunctionCallOutput.builder()
+                                        .callId(toolCallId.toString())
+                                        .output(content)
+                                        .build()));
+                break;
+
             default:
                 throw new IllegalArgumentException("Unsupported role: " + 
role);
         }
+        return items;
     }
 
-    private List<ChatCompletionMessageToolCall> convertAssistantToolCalls(
-            List<Map<String, Object>> toolCalls) {
-        List<ChatCompletionMessageToolCall> result = new 
ArrayList<>(toolCalls.size());
-        for (Map<String, Object> call : toolCalls) {
-            Object type = call.getOrDefault("type", "function");
-            if (!"function".equals(String.valueOf(type))) {
-                continue;
-            }
-
-            Map<String, Object> functionPayload = toMap(call.get("function"));
-            ChatCompletionMessageFunctionToolCall.Function.Builder 
functionBuilder =
-                    ChatCompletionMessageFunctionToolCall.Function.builder();
-
-            Object functionName = functionPayload.get("name");
-            if (functionName != null) {
-                functionBuilder.name(functionName.toString());
-            }
-
-            Object arguments = functionPayload.get("arguments");
-            functionBuilder.arguments(serializeArguments(arguments));
+    private List<Tool> convertTools(
+            List<org.apache.flink.agents.api.tools.Tool> tools, boolean 
strictMode) {
+        List<Tool> responsesTools = new ArrayList<>(tools.size());
+        for (org.apache.flink.agents.api.tools.Tool tool : tools) {
+            ToolMetadata metadata = tool.getMetadata();
+            FunctionTool.Builder functionBuilder =
+                    FunctionTool.builder()
+                            .name(metadata.getName())
+                            .description(metadata.getDescription());
 
-            Object idObj = call.get("id");
-            if (idObj == null) {
-                throw new IllegalArgumentException("Tool call must have an 
id.");
+            String schema = metadata.getInputSchema();
+            if (schema != null && !schema.isBlank()) {
+                functionBuilder.parameters(parseToolParameters(schema));
             }
-            String toolCallId = idObj.toString();
 
-            ChatCompletionMessageFunctionToolCall.Builder toolCallBuilder =
-                    ChatCompletionMessageFunctionToolCall.builder()
-                            .id(toolCallId)
-                            .function(functionBuilder.build())
-                            .type(JsonValue.from(String.valueOf(type)));
+            functionBuilder.strict(strictMode);
 
-            
result.add(ChatCompletionMessageToolCall.ofFunction(toolCallBuilder.build()));
+            responsesTools.add(Tool.ofFunction(functionBuilder.build()));
         }
-        return result;
+        return responsesTools;
     }
 
-    private ChatMessage convertResponse(ChatCompletion completion) {
-        List<ChatCompletion.Choice> choices = completion.choices();
-        if (choices.isEmpty()) {
-            throw new IllegalStateException("OpenAI response did not contain 
any choices.");
+    private ChatMessage convertResponse(Response response) {
+        List<ResponseOutputItem> output = response.output();
+        if (output == null || output.isEmpty()) {
+            throw new IllegalStateException("OpenAI Responses API did not 
return any output.");
         }
 
-        ChatCompletionMessage message = choices.get(0).message();
-        String content = message.content().orElse("");
-        ChatMessage response = ChatMessage.assistant(content);
+        StringBuilder textContent = new StringBuilder();
+        StringBuilder refusalContent = new StringBuilder();
+        List<Map<String, Object>> toolCalls = new ArrayList<>();
+
+        for (ResponseOutputItem item : output) {
+            if (item.isMessage()) {
+                ResponseOutputMessage msg = item.asMessage();
+                for (ResponseOutputMessage.Content contentBlock : 
msg.content()) {
+                    if (contentBlock.isOutputText()) {
+                        textContent.append(contentBlock.asOutputText().text());
+                    } else if (contentBlock.isRefusal()) {
+                        
refusalContent.append(contentBlock.asRefusal().refusal());
+                    }
+                }
+            } else if (item.isFunctionCall()) {
+                ResponseFunctionToolCall fc = item.asFunctionCall();
+                Map<String, Object> callMap = new LinkedHashMap<>();
+
+                String callId = fc.callId();
+                if (callId == null || callId.isBlank()) {
+                    throw new IllegalStateException(
+                            "OpenAI Responses API returned a function call 
without a call_id.");
+                }
 
-        message.refusal().ifPresent(refusal -> 
response.getExtraArgs().put("refusal", refusal));
+                callMap.put(
+                        "id",
+                        fc.id()
+                                .orElseThrow(
+                                        () ->
+                                                new IllegalStateException(
+                                                        "OpenAI Responses API 
returned a function call without an id.")));
+                callMap.put("type", "function");
+
+                Map<String, Object> functionMap = new LinkedHashMap<>();
+                functionMap.put("name", fc.name());
+                functionMap.put("arguments", parseArguments(fc.arguments()));
+                callMap.put("function", functionMap);
+                callMap.put("original_id", callId);
+
+                toolCalls.add(callMap);
+            }
+        }
 
-        List<ChatCompletionMessageToolCall> toolCalls = 
message.toolCalls().orElse(List.of());
+        ChatMessage result = ChatMessage.assistant(textContent.toString());
         if (!toolCalls.isEmpty()) {
-            response.setToolCalls(convertResponseToolCalls(toolCalls));
+            result.setToolCalls(toolCalls);
         }
 
-        return response;
-    }
-
-    private List<Map<String, Object>> convertResponseToolCalls(
-            List<ChatCompletionMessageToolCall> toolCalls) {
-        List<Map<String, Object>> result = new ArrayList<>(toolCalls.size());
-        for (ChatCompletionMessageToolCall toolCall : toolCalls) {
-            if (!toolCall.isFunction()) {
-                continue;
-            }
+        if (refusalContent.length() > 0) {
+            result.getExtraArgs().put("refusal", refusalContent.toString());
+        }
 
-            ChatCompletionMessageFunctionToolCall functionToolCall = 
toolCall.asFunction();
-            Map<String, Object> callMap = new LinkedHashMap<>();
-            String toolCallId = functionToolCall.id();
-            if (toolCallId == null || toolCallId.isBlank()) {
-                throw new IllegalStateException("OpenAI tool call ID is null 
or empty.");
-            }
+        result.getExtraArgs().put("response_id", response.id());
 
-            callMap.put("id", toolCallId);
-            callMap.put("type", "function");
+        return result;
+    }
 
-            ChatCompletionMessageFunctionToolCall.Function function = 
functionToolCall.function();
-            Map<String, Object> functionMap = new LinkedHashMap<>();
-            functionMap.put("name", function.name());
-            functionMap.put("arguments", parseArguments(function.arguments()));
-            callMap.put("function", functionMap);
-            callMap.put("original_id", toolCallId);
-            result.add(callMap);
+    private FunctionTool.Parameters parseToolParameters(String schemaJson) {
+        try {
+            JsonNode root = mapper.readTree(schemaJson);
+            if (root == null || !root.isObject()) {
+                return FunctionTool.Parameters.builder().build();
+            }
+            FunctionTool.Parameters.Builder builder = 
FunctionTool.Parameters.builder();
+            root.fields()
+                    .forEachRemaining(
+                            entry ->
+                                    builder.putAdditionalProperty(
+                                            entry.getKey(),
+                                            
JsonValue.fromJsonNode(entry.getValue())));
+            return builder.build();
+        } catch (JsonProcessingException e) {
+            throw new RuntimeException("Failed to parse tool schema JSON.", e);
         }
-        return result;
     }
 
     private Map<String, Object> parseArguments(String arguments) {
diff --git 
a/integrations/chat-models/openai/src/main/java/org/apache/flink/agents/integrations/chatmodels/openai/OpenAIChatModelSetup.java
 
b/integrations/chat-models/openai/src/main/java/org/apache/flink/agents/integrations/chatmodels/openai/OpenAIResponsesModelSetup.java
similarity index 74%
rename from 
integrations/chat-models/openai/src/main/java/org/apache/flink/agents/integrations/chatmodels/openai/OpenAIChatModelSetup.java
rename to 
integrations/chat-models/openai/src/main/java/org/apache/flink/agents/integrations/chatmodels/openai/OpenAIResponsesModelSetup.java
index 270dcb93..54e7d49b 100644
--- 
a/integrations/chat-models/openai/src/main/java/org/apache/flink/agents/integrations/chatmodels/openai/OpenAIChatModelSetup.java
+++ 
b/integrations/chat-models/openai/src/main/java/org/apache/flink/agents/integrations/chatmodels/openai/OpenAIResponsesModelSetup.java
@@ -30,53 +30,52 @@ import java.util.Set;
 import java.util.function.BiFunction;
 
 /**
- * Chat model setup for the OpenAI Chat Completions API.
+ * Chat model setup for the OpenAI Responses API.
  *
- * <p>Responsible for providing per-chat configuration such as model, 
temperature, tool bindings,
- * and additional OpenAI parameters. The setup delegates execution to {@link
- * OpenAIChatModelConnection}.
+ * <p>Responsible for providing per-chat configuration such as model, 
temperature, max tokens, tool
+ * bindings, and Responses API-specific parameters. The setup delegates 
execution to {@link
+ * OpenAIResponsesModelConnection}.
  *
  * <p>Example usage:
  *
  * <pre>{@code
  * public class MyAgent extends Agent {
  *   @ChatModelSetup
- *   public static ResourceDesc openAI() {
- *     return 
ResourceDescriptor.Builder.newBuilder(OpenAIChatModelSetup.class.getName())
- *             .addInitialArgument("connection", "myOpenAIConnection")
- *             .addInitialArgument("model", "gpt-4o-mini")
+ *   public static ResourceDesc openAIResponses() {
+ *     return 
ResourceDescriptor.Builder.newBuilder(OpenAIResponsesModelSetup.class.getName())
+ *             .addInitialArgument("connection", "myOpenAIResponsesConnection")
+ *             .addInitialArgument("model", "gpt-4o")
  *             .addInitialArgument("temperature", 0.3d)
- *             .addInitialArgument("max_tokens", 500)
+ *             .addInitialArgument("max_tokens", 2048)
  *             .addInitialArgument("strict", true)
  *             .addInitialArgument("reasoning_effort", "medium")
- *             .addInitialArgument("tools", List.of("convertTemperature", 
"calculateBMI"))
- *             .addInitialArgument(
- *                     "additional_kwargs",
- *                     Map.of("seed", 42, "user", "user-123"))
+ *             .addInitialArgument("store", true)
+ *             .addInitialArgument("tools", List.of("getWeather", "searchDB"))
  *             .build();
  *   }
  * }
  * }</pre>
  */
-public class OpenAIChatModelSetup extends BaseChatModelSetup {
+public class OpenAIResponsesModelSetup extends BaseChatModelSetup {
 
-    private static final String DEFAULT_MODEL = "gpt-3.5-turbo";
+    private static final String DEFAULT_MODEL = "gpt-4o";
     private static final double DEFAULT_TEMPERATURE = 0.1d;
-    private static final int DEFAULT_TOP_LOGPROBS = 0;
     private static final boolean DEFAULT_STRICT = false;
+    private static final boolean DEFAULT_STORE = false;
     private static final Set<String> VALID_REASONING_EFFORTS = Set.of("low", 
"medium", "high");
 
     private final Double temperature;
     private final Integer maxTokens;
-    private final Boolean logprobs;
-    private final Integer topLogprobs;
     private final Boolean strict;
     private final String reasoningEffort;
+    private final Boolean store;
+    private final String instructions;
     private final Map<String, Object> additionalArguments;
 
-    public OpenAIChatModelSetup(
+    public OpenAIResponsesModelSetup(
             ResourceDescriptor descriptor, BiFunction<String, ResourceType, 
Resource> getResource) {
         super(descriptor, getResource);
+
         this.temperature =
                 
Optional.ofNullable(descriptor.<Number>getArgument("temperature"))
                         .map(Number::doubleValue)
@@ -93,16 +92,6 @@ public class OpenAIChatModelSetup extends BaseChatModelSetup 
{
             throw new IllegalArgumentException("max_tokens must be greater 
than 0");
         }
 
-        this.logprobs = descriptor.getArgument("logprobs");
-
-        this.topLogprobs =
-                
Optional.ofNullable(descriptor.<Number>getArgument("top_logprobs"))
-                        .map(Number::intValue)
-                        .orElse(DEFAULT_TOP_LOGPROBS);
-        if (this.topLogprobs < 0 || this.topLogprobs > 20) {
-            throw new IllegalArgumentException("top_logprobs must be between 0 
and 20");
-        }
-
         this.strict =
                 Optional.ofNullable(descriptor.<Boolean>getArgument("strict"))
                         .orElse(DEFAULT_STRICT);
@@ -114,26 +103,30 @@ public class OpenAIChatModelSetup extends 
BaseChatModelSetup {
                     "reasoning_effort must be one of: low, medium, high");
         }
 
-        Map<String, Object> additional =
+        this.store =
+                
Optional.ofNullable(descriptor.<Boolean>getArgument("store")).orElse(DEFAULT_STORE);
+
+        this.instructions = descriptor.getArgument("instructions");
+
+        this.additionalArguments =
                 Optional.ofNullable(
                                 descriptor.<Map<String, 
Object>>getArgument("additional_kwargs"))
                         .map(HashMap::new)
                         .orElseGet(HashMap::new);
-        this.additionalArguments = additional;
 
         if (this.model == null || this.model.isBlank()) {
             this.model = DEFAULT_MODEL;
         }
     }
 
-    public OpenAIChatModelSetup(
+    public OpenAIResponsesModelSetup(
             String model,
             double temperature,
             Integer maxTokens,
-            Boolean logprobs,
-            Integer topLogprobs,
             Boolean strict,
             String reasoningEffort,
+            Boolean store,
+            String instructions,
             Map<String, Object> additionalArguments,
             List<String> tools,
             BiFunction<String, ResourceType, Resource> getResource) {
@@ -142,10 +135,10 @@ public class OpenAIChatModelSetup extends 
BaseChatModelSetup {
                         model,
                         temperature,
                         maxTokens,
-                        logprobs,
-                        topLogprobs,
                         strict,
                         reasoningEffort,
+                        store,
+                        instructions,
                         additionalArguments,
                         tools),
                 getResource);
@@ -161,16 +154,18 @@ public class OpenAIChatModelSetup extends 
BaseChatModelSetup {
         if (maxTokens != null) {
             parameters.put("max_tokens", maxTokens);
         }
-        if (Boolean.TRUE.equals(logprobs)) {
-            parameters.put("logprobs", logprobs);
-            parameters.put("top_logprobs", topLogprobs);
-        }
         if (strict) {
             parameters.put("strict", strict);
         }
         if (reasoningEffort != null) {
             parameters.put("reasoning_effort", reasoningEffort);
         }
+        if (store) {
+            parameters.put("store", store);
+        }
+        if (instructions != null) {
+            parameters.put("instructions", instructions);
+        }
         if (additionalArguments != null && !additionalArguments.isEmpty()) {
             parameters.put("additional_kwargs", additionalArguments);
         }
@@ -181,32 +176,32 @@ public class OpenAIChatModelSetup extends 
BaseChatModelSetup {
             String model,
             double temperature,
             Integer maxTokens,
-            Boolean logprobs,
-            Integer topLogprobs,
             Boolean strict,
             String reasoningEffort,
+            Boolean store,
+            String instructions,
             Map<String, Object> additionalArguments,
             List<String> tools) {
         ResourceDescriptor.Builder builder =
-                
ResourceDescriptor.Builder.newBuilder(OpenAIChatModelSetup.class.getName())
+                
ResourceDescriptor.Builder.newBuilder(OpenAIResponsesModelSetup.class.getName())
                         .addInitialArgument("model", model)
                         .addInitialArgument("temperature", temperature);
 
         if (maxTokens != null) {
             builder.addInitialArgument("max_tokens", maxTokens);
         }
-        if (logprobs != null) {
-            builder.addInitialArgument("logprobs", logprobs);
-        }
-        if (topLogprobs != null) {
-            builder.addInitialArgument("top_logprobs", topLogprobs);
-        }
         if (strict != null) {
             builder.addInitialArgument("strict", strict);
         }
         if (reasoningEffort != null) {
             builder.addInitialArgument("reasoning_effort", reasoningEffort);
         }
+        if (store != null) {
+            builder.addInitialArgument("store", store);
+        }
+        if (instructions != null) {
+            builder.addInitialArgument("instructions", instructions);
+        }
         if (additionalArguments != null && !additionalArguments.isEmpty()) {
             builder.addInitialArgument("additional_kwargs", 
additionalArguments);
         }
diff --git a/python/flink_agents/api/resource.py 
b/python/flink_agents/api/resource.py
index 76ebf937..a5b1dd86 100644
--- a/python/flink_agents/api/resource.py
+++ b/python/flink_agents/api/resource.py
@@ -271,9 +271,12 @@ class ResourceName:
             OLLAMA_CONNECTION = 
"org.apache.flink.agents.integrations.chatmodels.ollama.OllamaChatModelConnection"
             OLLAMA_SETUP = 
"org.apache.flink.agents.integrations.chatmodels.ollama.OllamaChatModelSetup"
 
-            # OpenAI
-            OPENAI_CONNECTION = 
"org.apache.flink.agents.integrations.chatmodels.openai.OpenAIChatModelConnection"
-            OPENAI_SETUP = 
"org.apache.flink.agents.integrations.chatmodels.openai.OpenAIChatModelSetup"
+            # OpenAI Completions
+            OPENAI_COMPLETIONS_CONNECTION = 
"org.apache.flink.agents.integrations.chatmodels.openai.OpenAICompletionsConnection"
+            OPENAI_COMPLETIONS_SETUP = 
"org.apache.flink.agents.integrations.chatmodels.openai.OpenAICompletionsSetup"
+
+            OPENAI_RESPONSES_CONNECTION = 
"org.apache.flink.agents.integrations.chatmodels.openai.OpenAIResponsesModelConnection"
+            OPENAI_RESPONSES_SETUP = 
"org.apache.flink.agents.integrations.chatmodels.openai.OpenAIResponsesModelSetup"
 
     class EmbeddingModel:
         """EmbeddingModel resource names."""

Reply via email to