This is an automated email from the ASF dual-hosted git repository.
xtsong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-agents.git
The following commit(s) were added to refs/heads/main by this push:
new 29e64149 [Feature][Java] Add OpenAI Responses API integration (#556)
29e64149 is described below
commit 29e641497bd5ae32726215b788f9bccec9a09daa
Author: Adesh Nalpet Adimurthy <[email protected]>
AuthorDate: Fri Mar 13 05:40:11 2026 -0400
[Feature][Java] Add OpenAI Responses API integration (#556)
---
.../flink/agents/api/resource/ResourceName.java | 18 +-
.../test/ChatModelIntegrationAgent.java | 20 +-
.../integration/test/ChatModelIntegrationTest.java | 2 +-
...ction.java => OpenAICompletionsConnection.java} | 6 +-
...ModelSetup.java => OpenAICompletionsSetup.java} | 12 +-
...on.java => OpenAIResponsesModelConnection.java} | 383 +++++++++++----------
...elSetup.java => OpenAIResponsesModelSetup.java} | 93 +++--
python/flink_agents/api/resource.py | 9 +-
8 files changed, 283 insertions(+), 260 deletions(-)
diff --git
a/api/src/main/java/org/apache/flink/agents/api/resource/ResourceName.java
b/api/src/main/java/org/apache/flink/agents/api/resource/ResourceName.java
index 399c93b2..1cbbfdf6 100644
--- a/api/src/main/java/org/apache/flink/agents/api/resource/ResourceName.java
+++ b/api/src/main/java/org/apache/flink/agents/api/resource/ResourceName.java
@@ -66,10 +66,16 @@ public final class ResourceName {
"org.apache.flink.agents.integrations.chatmodels.ollama.OllamaChatModelSetup";
// OpenAI
- public static final String OPENAI_CONNECTION =
-
"org.apache.flink.agents.integrations.chatmodels.openai.OpenAIChatModelConnection";
- public static final String OPENAI_SETUP =
-
"org.apache.flink.agents.integrations.chatmodels.openai.OpenAIChatModelSetup";
+ public static final String OPENAI_COMPLETIONS_CONNECTION =
+
"org.apache.flink.agents.integrations.chatmodels.openai.OpenAICompletionsConnection";
+ public static final String OPENAI_COMPLETIONS_SETUP =
+
"org.apache.flink.agents.integrations.chatmodels.openai.OpenAICompletionsSetup";
+
+ // OpenAI Responses API
+ public static final String OPENAI_RESPONSES_CONNECTION =
+
"org.apache.flink.agents.integrations.chatmodels.openai.OpenAIResponsesModelConnection";
+ public static final String OPENAI_RESPONSES_SETUP =
+
"org.apache.flink.agents.integrations.chatmodels.openai.OpenAIResponsesModelSetup";
// Python Wrapper
public static final String PYTHON_WRAPPER_CONNECTION =
@@ -99,9 +105,9 @@ public final class ResourceName {
"flink_agents.integrations.chat_models.ollama_chat_model.OllamaChatModelSetup";
// OpenAI
- public static final String OPENAI_CONNECTION =
+ public static final String OPENAI_COMPLETIONS_CONNECTION =
"flink_agents.integrations.chat_models.openai.openai_chat_model.OpenAIChatModelConnection";
- public static final String OPENAI_SETUP =
+ public static final String OPENAI_COMPLETIONS_SETUP =
"flink_agents.integrations.chat_models.openai.openai_chat_model.OpenAIChatModelSetup";
// Tongyi
diff --git
a/e2e-test/flink-agents-end-to-end-tests-integration/src/test/java/org/apache/flink/agents/integration/test/ChatModelIntegrationAgent.java
b/e2e-test/flink-agents-end-to-end-tests-integration/src/test/java/org/apache/flink/agents/integration/test/ChatModelIntegrationAgent.java
index 99977169..e3b10abb 100644
---
a/e2e-test/flink-agents-end-to-end-tests-integration/src/test/java/org/apache/flink/agents/integration/test/ChatModelIntegrationAgent.java
+++
b/e2e-test/flink-agents-end-to-end-tests-integration/src/test/java/org/apache/flink/agents/integration/test/ChatModelIntegrationAgent.java
@@ -81,9 +81,15 @@ public class ChatModelIntegrationAgent extends Agent {
.build();
} else if (provider.equals("OPENAI")) {
String apiKey = System.getenv().get("OPENAI_API_KEY");
- return
ResourceDescriptor.Builder.newBuilder(ResourceName.ChatModel.OPENAI_CONNECTION)
+ return ResourceDescriptor.Builder.newBuilder(
+
ResourceName.ChatModel.OPENAI_COMPLETIONS_CONNECTION)
.addInitialArgument("api_key", apiKey)
.build();
+ } else if (provider.equals("OPENAI_RESPONSES")) {
+ return ResourceDescriptor.Builder.newBuilder(
+ ResourceName.ChatModel.OPENAI_RESPONSES_CONNECTION)
+ .addInitialArgument("api_key",
System.getenv().get("OPENAI_API_KEY"))
+ .build();
} else if (provider.equals("ANTHROPIC")) {
String apiKey = System.getenv().get("ANTHROPIC_API_KEY");
return ResourceDescriptor.Builder.newBuilder(
@@ -126,7 +132,17 @@ public class ChatModelIntegrationAgent extends Agent {
List.of("calculateBMI", "convertTemperature",
"createRandomNumber"))
.build();
} else if (provider.equals("OPENAI")) {
- return
ResourceDescriptor.Builder.newBuilder(ResourceName.ChatModel.OPENAI_SETUP)
+ return ResourceDescriptor.Builder.newBuilder(
+ ResourceName.ChatModel.OPENAI_COMPLETIONS_SETUP)
+ .addInitialArgument("connection", "chatModelConnection")
+ .addInitialArgument("model", "gpt-4o-mini")
+ .addInitialArgument(
+ "tools",
+ List.of("calculateBMI", "convertTemperature",
"createRandomNumber"))
+ .build();
+ } else if (provider.equals("OPENAI_RESPONSES")) {
+ return ResourceDescriptor.Builder.newBuilder(
+ ResourceName.ChatModel.OPENAI_RESPONSES_SETUP)
.addInitialArgument("connection", "chatModelConnection")
.addInitialArgument("model", "gpt-4o-mini")
.addInitialArgument(
diff --git
a/e2e-test/flink-agents-end-to-end-tests-integration/src/test/java/org/apache/flink/agents/integration/test/ChatModelIntegrationTest.java
b/e2e-test/flink-agents-end-to-end-tests-integration/src/test/java/org/apache/flink/agents/integration/test/ChatModelIntegrationTest.java
index c843b970..75a3d5c1 100644
---
a/e2e-test/flink-agents-end-to-end-tests-integration/src/test/java/org/apache/flink/agents/integration/test/ChatModelIntegrationTest.java
+++
b/e2e-test/flink-agents-end-to-end-tests-integration/src/test/java/org/apache/flink/agents/integration/test/ChatModelIntegrationTest.java
@@ -53,7 +53,7 @@ public class ChatModelIntegrationTest extends
OllamaPreparationUtils {
}
@ParameterizedTest()
- @ValueSource(strings = {"ANTHROPIC", "AZURE", "OLLAMA", "OPENAI"})
+ @ValueSource(strings = {"ANTHROPIC", "AZURE", "OLLAMA", "OPENAI",
"OPENAI_RESPONSES"})
public void testChatModeIntegration(String provider) throws Exception {
Assumptions.assumeTrue(
(OLLAMA.equals(provider) && ollamaReady)
diff --git
a/integrations/chat-models/openai/src/main/java/org/apache/flink/agents/integrations/chatmodels/openai/OpenAIChatModelConnection.java
b/integrations/chat-models/openai/src/main/java/org/apache/flink/agents/integrations/chatmodels/openai/OpenAICompletionsConnection.java
similarity index 98%
copy from
integrations/chat-models/openai/src/main/java/org/apache/flink/agents/integrations/chatmodels/openai/OpenAIChatModelConnection.java
copy to
integrations/chat-models/openai/src/main/java/org/apache/flink/agents/integrations/chatmodels/openai/OpenAICompletionsConnection.java
index b04cd2b2..6ed1b170 100644
---
a/integrations/chat-models/openai/src/main/java/org/apache/flink/agents/integrations/chatmodels/openai/OpenAIChatModelConnection.java
+++
b/integrations/chat-models/openai/src/main/java/org/apache/flink/agents/integrations/chatmodels/openai/OpenAICompletionsConnection.java
@@ -80,7 +80,7 @@ import java.util.stream.Collectors;
* public class MyAgent extends Agent {
* @ChatModelConnection
* public static ResourceDesc openAI() {
- * return
ResourceDescriptor.Builder.newBuilder(OpenAIChatModelConnection.class.getName())
+ * return
ResourceDescriptor.Builder.newBuilder(OpenAICompletionsConnection.class.getName())
* .addInitialArgument("api_key", System.getenv("OPENAI_API_KEY"))
* .addInitialArgument("api_base_url", "https://api.openai.com/v1")
* .addInitialArgument("timeout", 120)
@@ -91,7 +91,7 @@ import java.util.stream.Collectors;
* }
* }</pre>
*/
-public class OpenAIChatModelConnection extends BaseChatModelConnection {
+public class OpenAICompletionsConnection extends BaseChatModelConnection {
private static final TypeReference<Map<String, Object>> MAP_TYPE = new
TypeReference<>() {};
@@ -99,7 +99,7 @@ public class OpenAIChatModelConnection extends
BaseChatModelConnection {
private final OpenAIClient client;
private final String defaultModel;
- public OpenAIChatModelConnection(
+ public OpenAICompletionsConnection(
ResourceDescriptor descriptor, BiFunction<String, ResourceType,
Resource> getResource) {
super(descriptor, getResource);
diff --git
a/integrations/chat-models/openai/src/main/java/org/apache/flink/agents/integrations/chatmodels/openai/OpenAIChatModelSetup.java
b/integrations/chat-models/openai/src/main/java/org/apache/flink/agents/integrations/chatmodels/openai/OpenAICompletionsSetup.java
similarity index 95%
copy from
integrations/chat-models/openai/src/main/java/org/apache/flink/agents/integrations/chatmodels/openai/OpenAIChatModelSetup.java
copy to
integrations/chat-models/openai/src/main/java/org/apache/flink/agents/integrations/chatmodels/openai/OpenAICompletionsSetup.java
index 270dcb93..13a32ac2 100644
---
a/integrations/chat-models/openai/src/main/java/org/apache/flink/agents/integrations/chatmodels/openai/OpenAIChatModelSetup.java
+++
b/integrations/chat-models/openai/src/main/java/org/apache/flink/agents/integrations/chatmodels/openai/OpenAICompletionsSetup.java
@@ -34,7 +34,7 @@ import java.util.function.BiFunction;
*
* <p>Responsible for providing per-chat configuration such as model,
temperature, tool bindings,
* and additional OpenAI parameters. The setup delegates execution to {@link
- * OpenAIChatModelConnection}.
+ * OpenAICompletionsConnection}.
*
* <p>Example usage:
*
@@ -42,7 +42,7 @@ import java.util.function.BiFunction;
* public class MyAgent extends Agent {
* @ChatModelSetup
* public static ResourceDesc openAI() {
- * return
ResourceDescriptor.Builder.newBuilder(OpenAIChatModelSetup.class.getName())
+ * return
ResourceDescriptor.Builder.newBuilder(OpenAICompletionsSetup.class.getName())
* .addInitialArgument("connection", "myOpenAIConnection")
* .addInitialArgument("model", "gpt-4o-mini")
* .addInitialArgument("temperature", 0.3d)
@@ -58,7 +58,7 @@ import java.util.function.BiFunction;
* }
* }</pre>
*/
-public class OpenAIChatModelSetup extends BaseChatModelSetup {
+public class OpenAICompletionsSetup extends BaseChatModelSetup {
private static final String DEFAULT_MODEL = "gpt-3.5-turbo";
private static final double DEFAULT_TEMPERATURE = 0.1d;
@@ -74,7 +74,7 @@ public class OpenAIChatModelSetup extends BaseChatModelSetup {
private final String reasoningEffort;
private final Map<String, Object> additionalArguments;
- public OpenAIChatModelSetup(
+ public OpenAICompletionsSetup(
ResourceDescriptor descriptor, BiFunction<String, ResourceType,
Resource> getResource) {
super(descriptor, getResource);
this.temperature =
@@ -126,7 +126,7 @@ public class OpenAIChatModelSetup extends
BaseChatModelSetup {
}
}
- public OpenAIChatModelSetup(
+ public OpenAICompletionsSetup(
String model,
double temperature,
Integer maxTokens,
@@ -188,7 +188,7 @@ public class OpenAIChatModelSetup extends
BaseChatModelSetup {
Map<String, Object> additionalArguments,
List<String> tools) {
ResourceDescriptor.Builder builder =
-
ResourceDescriptor.Builder.newBuilder(OpenAIChatModelSetup.class.getName())
+
ResourceDescriptor.Builder.newBuilder(OpenAICompletionsSetup.class.getName())
.addInitialArgument("model", model)
.addInitialArgument("temperature", temperature);
diff --git
a/integrations/chat-models/openai/src/main/java/org/apache/flink/agents/integrations/chatmodels/openai/OpenAIChatModelConnection.java
b/integrations/chat-models/openai/src/main/java/org/apache/flink/agents/integrations/chatmodels/openai/OpenAIResponsesModelConnection.java
similarity index 52%
rename from
integrations/chat-models/openai/src/main/java/org/apache/flink/agents/integrations/chatmodels/openai/OpenAIChatModelConnection.java
rename to
integrations/chat-models/openai/src/main/java/org/apache/flink/agents/integrations/chatmodels/openai/OpenAIResponsesModelConnection.java
index b04cd2b2..f185d65f 100644
---
a/integrations/chat-models/openai/src/main/java/org/apache/flink/agents/integrations/chatmodels/openai/OpenAIChatModelConnection.java
+++
b/integrations/chat-models/openai/src/main/java/org/apache/flink/agents/integrations/chatmodels/openai/OpenAIResponsesModelConnection.java
@@ -25,28 +25,15 @@ import com.openai.client.OpenAIClient;
import com.openai.client.okhttp.OpenAIOkHttpClient;
import com.openai.core.JsonValue;
import com.openai.models.ChatModel;
-import com.openai.models.FunctionDefinition;
-import com.openai.models.FunctionParameters;
+import com.openai.models.Reasoning;
import com.openai.models.ReasoningEffort;
-import com.openai.models.chat.completions.ChatCompletion;
-import com.openai.models.chat.completions.ChatCompletionAssistantMessageParam;
-import com.openai.models.chat.completions.ChatCompletionCreateParams;
-import com.openai.models.chat.completions.ChatCompletionFunctionTool;
-import com.openai.models.chat.completions.ChatCompletionMessage;
-import
com.openai.models.chat.completions.ChatCompletionMessageFunctionToolCall;
-import com.openai.models.chat.completions.ChatCompletionMessageParam;
-import com.openai.models.chat.completions.ChatCompletionMessageToolCall;
-import com.openai.models.chat.completions.ChatCompletionSystemMessageParam;
-import com.openai.models.chat.completions.ChatCompletionTool;
-import com.openai.models.chat.completions.ChatCompletionToolMessageParam;
-import com.openai.models.chat.completions.ChatCompletionUserMessageParam;
+import com.openai.models.responses.*;
import org.apache.flink.agents.api.chat.messages.ChatMessage;
import org.apache.flink.agents.api.chat.messages.MessageRole;
import org.apache.flink.agents.api.chat.model.BaseChatModelConnection;
import org.apache.flink.agents.api.resource.Resource;
import org.apache.flink.agents.api.resource.ResourceDescriptor;
import org.apache.flink.agents.api.resource.ResourceType;
-import org.apache.flink.agents.api.tools.Tool;
import org.apache.flink.agents.api.tools.ToolMetadata;
import java.time.Duration;
@@ -57,17 +44,22 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.BiFunction;
-import java.util.stream.Collectors;
/**
- * A chat model integration for the OpenAI Chat Completions service using the
official Java SDK.
+ * A <b>dedicated</b> OpenAI chat model integration using the Responses API.
+ *
+ * <p>Unlike {@link OpenAICompletionsConnection} which uses the Chat
Completions API and works with
+ * any OpenAI-compatible provider (DeepSeek, DashScope, etc.), this
implementation uses OpenAI's
+ * Responses API which is specific to OpenAI.
+ *
+ * <p>For OpenAI-compatible providers that only support the Chat Completions
API, use {@link
+ * OpenAICompletionsConnection} instead.
*
* <p>Supported connection parameters:
*
* <ul>
* <li><b>api_key</b> (required): OpenAI API key
- * <li><b>api_base_url</b> (optional): Base URL for OpenAI API (defaults to
- * https://api.openai.com/v1)
+ * <li><b>api_base_url</b> (optional): Base URL for OpenAI API (useful for
proxies)
* <li><b>timeout</b> (optional): Timeout in seconds for API requests
* <li><b>max_retries</b> (optional): Maximum number of retry attempts
(default: 2)
* <li><b>default_headers</b> (optional): Map of default headers to include
in all requests
@@ -79,27 +71,25 @@ import java.util.stream.Collectors;
* <pre>{@code
* public class MyAgent extends Agent {
* @ChatModelConnection
- * public static ResourceDesc openAI() {
- * return
ResourceDescriptor.Builder.newBuilder(OpenAIChatModelConnection.class.getName())
+ * public static ResourceDesc openAIResponses() {
+ * return
ResourceDescriptor.Builder.newBuilder(OpenAIResponsesModelConnection.class.getName())
* .addInitialArgument("api_key", System.getenv("OPENAI_API_KEY"))
- * .addInitialArgument("api_base_url", "https://api.openai.com/v1")
* .addInitialArgument("timeout", 120)
* .addInitialArgument("max_retries", 3)
- * .addInitialArgument("default_headers",
Map.of("X-Custom-Header", "value"))
* .build();
* }
* }
* }</pre>
*/
-public class OpenAIChatModelConnection extends BaseChatModelConnection {
+public class OpenAIResponsesModelConnection extends BaseChatModelConnection {
private static final TypeReference<Map<String, Object>> MAP_TYPE = new
TypeReference<>() {};
+ private static final ObjectMapper mapper = new ObjectMapper();
- private final ObjectMapper mapper = new ObjectMapper();
private final OpenAIClient client;
private final String defaultModel;
- public OpenAIChatModelConnection(
+ public OpenAIResponsesModelConnection(
ResourceDescriptor descriptor, BiFunction<String, ResourceType,
Resource> getResource) {
super(descriptor, getResource);
@@ -138,14 +128,15 @@ public class OpenAIChatModelConnection extends
BaseChatModelConnection {
@Override
public ChatMessage chat(
- List<ChatMessage> messages, List<Tool> tools, Map<String, Object>
arguments) {
+ List<ChatMessage> messages,
+ List<org.apache.flink.agents.api.tools.Tool> tools,
+ Map<String, Object> arguments) {
try {
- ChatCompletionCreateParams params = buildRequest(messages, tools,
arguments);
- ChatCompletion completion =
client.chat().completions().create(params);
- ChatMessage response = convertResponse(completion);
+ ResponseCreateParams params = buildRequest(messages, tools,
arguments);
+ Response response = client.responses().create(params);
+ ChatMessage result = convertResponse(response);
- // Record token metrics
- if (completion.usage().isPresent()) {
+ if (response.usage().isPresent()) {
String modelName = arguments != null ? (String)
arguments.get("model") : null;
if (modelName == null || modelName.isBlank()) {
modelName = this.defaultModel;
@@ -153,19 +144,21 @@ public class OpenAIChatModelConnection extends
BaseChatModelConnection {
if (modelName != null && !modelName.isBlank()) {
recordTokenMetrics(
modelName,
- completion.usage().get().promptTokens(),
- completion.usage().get().completionTokens());
+ response.usage().get().inputTokens(),
+ response.usage().get().outputTokens());
}
}
- return response;
+ return result;
} catch (Exception e) {
- throw new RuntimeException("Failed to call OpenAI chat completions
API.", e);
+ throw new RuntimeException("Failed to call OpenAI Responses API.",
e);
}
}
- private ChatCompletionCreateParams buildRequest(
- List<ChatMessage> messages, List<Tool> tools, Map<String, Object>
rawArguments) {
+ private ResponseCreateParams buildRequest(
+ List<ChatMessage> messages,
+ List<org.apache.flink.agents.api.tools.Tool> tools,
+ Map<String, Object> rawArguments) {
Map<String, Object> arguments =
rawArguments != null ? new HashMap<>(rawArguments) : new
HashMap<>();
@@ -175,13 +168,12 @@ public class OpenAIChatModelConnection extends
BaseChatModelConnection {
modelName = this.defaultModel;
}
- ChatCompletionCreateParams.Builder builder =
- ChatCompletionCreateParams.builder()
+ List<ResponseInputItem> inputItems = convertInputItems(messages);
+
+ ResponseCreateParams.Builder builder =
+ ResponseCreateParams.builder()
.model(ChatModel.of(modelName))
- .messages(
- messages.stream()
- .map(this::convertToOpenAIMessage)
- .collect(Collectors.toList()));
+ .inputOfResponse(inputItems);
if (tools != null && !tools.isEmpty()) {
builder.tools(convertTools(tools, strictMode));
@@ -194,24 +186,25 @@ public class OpenAIChatModelConnection extends
BaseChatModelConnection {
Object maxTokens = arguments.remove("max_tokens");
if (maxTokens instanceof Number) {
- builder.maxCompletionTokens(((Number) maxTokens).longValue());
- }
-
- Object logprobs = arguments.remove("logprobs");
- boolean logprobsEnabled = Boolean.TRUE.equals(logprobs);
- if (logprobsEnabled) {
- builder.logprobs(true);
- Object topLogprobs = arguments.remove("top_logprobs");
- if (topLogprobs instanceof Number) {
- builder.topLogprobs(((Number) topLogprobs).longValue());
- }
- } else {
- arguments.remove("top_logprobs");
+ builder.maxOutputTokens(((Number) maxTokens).longValue());
}
Object reasoningEffort = arguments.remove("reasoning_effort");
if (reasoningEffort instanceof String) {
- builder.reasoningEffort(ReasoningEffort.of((String)
reasoningEffort));
+ builder.reasoning(
+ Reasoning.builder()
+ .effort(ReasoningEffort.of((String)
reasoningEffort))
+ .build());
+ }
+
+ Object store = arguments.remove("store");
+ if (Boolean.TRUE.equals(store)) {
+ builder.store(true);
+ }
+
+ Object instructions = arguments.remove("instructions");
+ if (instructions instanceof String) {
+ builder.instructions((String) instructions);
}
@SuppressWarnings("unchecked")
@@ -225,181 +218,191 @@ public class OpenAIChatModelConnection extends
BaseChatModelConnection {
return builder.build();
}
- private List<ChatCompletionTool> convertTools(List<Tool> tools, boolean
strictMode) {
- List<ChatCompletionTool> openaiTools = new ArrayList<>(tools.size());
- for (Tool tool : tools) {
- ToolMetadata metadata = tool.getMetadata();
- FunctionDefinition.Builder functionBuilder =
- FunctionDefinition.builder()
- .name(metadata.getName())
- .description(metadata.getDescription());
-
- String schema = metadata.getInputSchema();
- if (schema != null && !schema.isBlank()) {
- functionBuilder.parameters(parseFunctionParameters(schema));
- }
-
- if (strictMode) {
- functionBuilder.strict(true);
- }
-
- ChatCompletionFunctionTool functionTool =
- ChatCompletionFunctionTool.builder()
- .function(functionBuilder.build())
- .type(JsonValue.from("function"))
- .build();
-
- openaiTools.add(ChatCompletionTool.ofFunction(functionTool));
+ private List<ResponseInputItem> convertInputItems(List<ChatMessage>
messages) {
+ List<ResponseInputItem> items = new ArrayList<>();
+ for (ChatMessage message : messages) {
+ items.addAll(convertSingleMessage(message));
}
- return openaiTools;
+ return items;
}
- private FunctionParameters parseFunctionParameters(String schemaJson) {
- try {
- JsonNode root = mapper.readTree(schemaJson);
- if (root == null || !root.isObject()) {
- return FunctionParameters.builder().build();
- }
-
- FunctionParameters.Builder builder = FunctionParameters.builder();
- root.fields()
- .forEachRemaining(
- entry ->
- builder.putAdditionalProperty(
- entry.getKey(),
-
JsonValue.fromJsonNode(entry.getValue())));
- return builder.build();
- } catch (JsonProcessingException e) {
- throw new RuntimeException("Failed to parse tool schema JSON.", e);
- }
- }
-
- private ChatCompletionMessageParam convertToOpenAIMessage(ChatMessage
message) {
+ private List<ResponseInputItem> convertSingleMessage(ChatMessage message) {
+ List<ResponseInputItem> items = new ArrayList<>();
MessageRole role = message.getRole();
String content = Optional.ofNullable(message.getContent()).orElse("");
switch (role) {
case SYSTEM:
- return ChatCompletionMessageParam.ofSystem(
-
ChatCompletionSystemMessageParam.builder().content(content).build());
+ items.add(
+ ResponseInputItem.ofMessage(
+ ResponseInputItem.Message.builder()
+
.role(ResponseInputItem.Message.Role.SYSTEM)
+ .addInputTextContent(content)
+ .build()));
+ break;
+
case USER:
- return ChatCompletionMessageParam.ofUser(
-
ChatCompletionUserMessageParam.builder().content(content).build());
+ items.add(
+ ResponseInputItem.ofMessage(
+ ResponseInputItem.Message.builder()
+
.role(ResponseInputItem.Message.Role.USER)
+ .addInputTextContent(content)
+ .build()));
+ break;
+
case ASSISTANT:
- ChatCompletionAssistantMessageParam.Builder assistantBuilder =
- ChatCompletionAssistantMessageParam.builder();
- if (!content.isEmpty()) {
- assistantBuilder.content(content);
- }
List<Map<String, Object>> toolCalls = message.getToolCalls();
if (toolCalls != null && !toolCalls.isEmpty()) {
-
assistantBuilder.toolCalls(convertAssistantToolCalls(toolCalls));
+ for (Map<String, Object> call : toolCalls) {
+ Map<String, Object> functionPayload =
toMap(call.get("function"));
+ String responseId = String.valueOf(call.get("id"));
+ String callId =
String.valueOf(call.get("original_id"));
+ String name =
String.valueOf(functionPayload.get("name"));
+ String args =
serializeArguments(functionPayload.get("arguments"));
+
+ items.add(
+ ResponseInputItem.ofFunctionCall(
+ ResponseFunctionToolCall.builder()
+ .id(responseId)
+ .callId(callId)
+ .name(name)
+ .arguments(args)
+
.status(ResponseFunctionToolCall.Status.COMPLETED)
+ .build()));
+ }
}
- Object refusal = message.getExtraArgs().get("refusal");
- if (refusal instanceof String) {
- assistantBuilder.refusal((String) refusal);
+ if (!content.isEmpty()) {
+ items.add(
+ ResponseInputItem.ofEasyInputMessage(
+ EasyInputMessage.builder()
+
.role(EasyInputMessage.Role.ASSISTANT)
+ .content(content)
+ .build()));
}
- return
ChatCompletionMessageParam.ofAssistant(assistantBuilder.build());
+ break;
+
case TOOL:
- ChatCompletionToolMessageParam.Builder toolBuilder =
-
ChatCompletionToolMessageParam.builder().content(content);
Object toolCallId = message.getExtraArgs().get("externalId");
if (toolCallId == null) {
throw new IllegalArgumentException(
"Tool message must have an externalId in
extraArgs.");
}
- toolBuilder.toolCallId(toolCallId.toString());
- return ChatCompletionMessageParam.ofTool(toolBuilder.build());
+ items.add(
+ ResponseInputItem.ofFunctionCallOutput(
+ ResponseInputItem.FunctionCallOutput.builder()
+ .callId(toolCallId.toString())
+ .output(content)
+ .build()));
+ break;
+
default:
throw new IllegalArgumentException("Unsupported role: " +
role);
}
+ return items;
}
- private List<ChatCompletionMessageToolCall> convertAssistantToolCalls(
- List<Map<String, Object>> toolCalls) {
- List<ChatCompletionMessageToolCall> result = new
ArrayList<>(toolCalls.size());
- for (Map<String, Object> call : toolCalls) {
- Object type = call.getOrDefault("type", "function");
- if (!"function".equals(String.valueOf(type))) {
- continue;
- }
-
- Map<String, Object> functionPayload = toMap(call.get("function"));
- ChatCompletionMessageFunctionToolCall.Function.Builder
functionBuilder =
- ChatCompletionMessageFunctionToolCall.Function.builder();
-
- Object functionName = functionPayload.get("name");
- if (functionName != null) {
- functionBuilder.name(functionName.toString());
- }
-
- Object arguments = functionPayload.get("arguments");
- functionBuilder.arguments(serializeArguments(arguments));
+ private List<Tool> convertTools(
+ List<org.apache.flink.agents.api.tools.Tool> tools, boolean
strictMode) {
+ List<Tool> responsesTools = new ArrayList<>(tools.size());
+ for (org.apache.flink.agents.api.tools.Tool tool : tools) {
+ ToolMetadata metadata = tool.getMetadata();
+ FunctionTool.Builder functionBuilder =
+ FunctionTool.builder()
+ .name(metadata.getName())
+ .description(metadata.getDescription());
- Object idObj = call.get("id");
- if (idObj == null) {
- throw new IllegalArgumentException("Tool call must have an
id.");
+ String schema = metadata.getInputSchema();
+ if (schema != null && !schema.isBlank()) {
+ functionBuilder.parameters(parseToolParameters(schema));
}
- String toolCallId = idObj.toString();
- ChatCompletionMessageFunctionToolCall.Builder toolCallBuilder =
- ChatCompletionMessageFunctionToolCall.builder()
- .id(toolCallId)
- .function(functionBuilder.build())
- .type(JsonValue.from(String.valueOf(type)));
+ functionBuilder.strict(strictMode);
-
result.add(ChatCompletionMessageToolCall.ofFunction(toolCallBuilder.build()));
+ responsesTools.add(Tool.ofFunction(functionBuilder.build()));
}
- return result;
+ return responsesTools;
}
- private ChatMessage convertResponse(ChatCompletion completion) {
- List<ChatCompletion.Choice> choices = completion.choices();
- if (choices.isEmpty()) {
- throw new IllegalStateException("OpenAI response did not contain
any choices.");
+ private ChatMessage convertResponse(Response response) {
+ List<ResponseOutputItem> output = response.output();
+ if (output == null || output.isEmpty()) {
+ throw new IllegalStateException("OpenAI Responses API did not
return any output.");
}
- ChatCompletionMessage message = choices.get(0).message();
- String content = message.content().orElse("");
- ChatMessage response = ChatMessage.assistant(content);
+ StringBuilder textContent = new StringBuilder();
+ StringBuilder refusalContent = new StringBuilder();
+ List<Map<String, Object>> toolCalls = new ArrayList<>();
+
+ for (ResponseOutputItem item : output) {
+ if (item.isMessage()) {
+ ResponseOutputMessage msg = item.asMessage();
+ for (ResponseOutputMessage.Content contentBlock :
msg.content()) {
+ if (contentBlock.isOutputText()) {
+ textContent.append(contentBlock.asOutputText().text());
+ } else if (contentBlock.isRefusal()) {
+
refusalContent.append(contentBlock.asRefusal().refusal());
+ }
+ }
+ } else if (item.isFunctionCall()) {
+ ResponseFunctionToolCall fc = item.asFunctionCall();
+ Map<String, Object> callMap = new LinkedHashMap<>();
+
+ String callId = fc.callId();
+ if (callId == null || callId.isBlank()) {
+ throw new IllegalStateException(
+ "OpenAI Responses API returned a function call
without a call_id.");
+ }
- message.refusal().ifPresent(refusal ->
response.getExtraArgs().put("refusal", refusal));
+ callMap.put(
+ "id",
+ fc.id()
+ .orElseThrow(
+ () ->
+ new IllegalStateException(
+ "OpenAI Responses API
returned a function call without an id.")));
+ callMap.put("type", "function");
+
+ Map<String, Object> functionMap = new LinkedHashMap<>();
+ functionMap.put("name", fc.name());
+ functionMap.put("arguments", parseArguments(fc.arguments()));
+ callMap.put("function", functionMap);
+ callMap.put("original_id", callId);
+
+ toolCalls.add(callMap);
+ }
+ }
- List<ChatCompletionMessageToolCall> toolCalls =
message.toolCalls().orElse(List.of());
+ ChatMessage result = ChatMessage.assistant(textContent.toString());
if (!toolCalls.isEmpty()) {
- response.setToolCalls(convertResponseToolCalls(toolCalls));
+ result.setToolCalls(toolCalls);
}
- return response;
- }
-
- private List<Map<String, Object>> convertResponseToolCalls(
- List<ChatCompletionMessageToolCall> toolCalls) {
- List<Map<String, Object>> result = new ArrayList<>(toolCalls.size());
- for (ChatCompletionMessageToolCall toolCall : toolCalls) {
- if (!toolCall.isFunction()) {
- continue;
- }
+ if (refusalContent.length() > 0) {
+ result.getExtraArgs().put("refusal", refusalContent.toString());
+ }
- ChatCompletionMessageFunctionToolCall functionToolCall =
toolCall.asFunction();
- Map<String, Object> callMap = new LinkedHashMap<>();
- String toolCallId = functionToolCall.id();
- if (toolCallId == null || toolCallId.isBlank()) {
- throw new IllegalStateException("OpenAI tool call ID is null
or empty.");
- }
+ result.getExtraArgs().put("response_id", response.id());
- callMap.put("id", toolCallId);
- callMap.put("type", "function");
+ return result;
+ }
- ChatCompletionMessageFunctionToolCall.Function function =
functionToolCall.function();
- Map<String, Object> functionMap = new LinkedHashMap<>();
- functionMap.put("name", function.name());
- functionMap.put("arguments", parseArguments(function.arguments()));
- callMap.put("function", functionMap);
- callMap.put("original_id", toolCallId);
- result.add(callMap);
+ private FunctionTool.Parameters parseToolParameters(String schemaJson) {
+ try {
+ JsonNode root = mapper.readTree(schemaJson);
+ if (root == null || !root.isObject()) {
+ return FunctionTool.Parameters.builder().build();
+ }
+ FunctionTool.Parameters.Builder builder =
FunctionTool.Parameters.builder();
+ root.fields()
+ .forEachRemaining(
+ entry ->
+ builder.putAdditionalProperty(
+ entry.getKey(),
+
JsonValue.fromJsonNode(entry.getValue())));
+ return builder.build();
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException("Failed to parse tool schema JSON.", e);
}
- return result;
}
private Map<String, Object> parseArguments(String arguments) {
diff --git
a/integrations/chat-models/openai/src/main/java/org/apache/flink/agents/integrations/chatmodels/openai/OpenAIChatModelSetup.java
b/integrations/chat-models/openai/src/main/java/org/apache/flink/agents/integrations/chatmodels/openai/OpenAIResponsesModelSetup.java
similarity index 74%
rename from
integrations/chat-models/openai/src/main/java/org/apache/flink/agents/integrations/chatmodels/openai/OpenAIChatModelSetup.java
rename to
integrations/chat-models/openai/src/main/java/org/apache/flink/agents/integrations/chatmodels/openai/OpenAIResponsesModelSetup.java
index 270dcb93..54e7d49b 100644
---
a/integrations/chat-models/openai/src/main/java/org/apache/flink/agents/integrations/chatmodels/openai/OpenAIChatModelSetup.java
+++
b/integrations/chat-models/openai/src/main/java/org/apache/flink/agents/integrations/chatmodels/openai/OpenAIResponsesModelSetup.java
@@ -30,53 +30,52 @@ import java.util.Set;
import java.util.function.BiFunction;
/**
- * Chat model setup for the OpenAI Chat Completions API.
+ * Chat model setup for the OpenAI Responses API.
*
- * <p>Responsible for providing per-chat configuration such as model,
temperature, tool bindings,
- * and additional OpenAI parameters. The setup delegates execution to {@link
- * OpenAIChatModelConnection}.
+ * <p>Responsible for providing per-chat configuration such as model,
temperature, max tokens, tool
+ * bindings, and Responses API-specific parameters. The setup delegates
execution to {@link
+ * OpenAIResponsesModelConnection}.
*
* <p>Example usage:
*
* <pre>{@code
* public class MyAgent extends Agent {
* @ChatModelSetup
- * public static ResourceDesc openAI() {
- * return
ResourceDescriptor.Builder.newBuilder(OpenAIChatModelSetup.class.getName())
- * .addInitialArgument("connection", "myOpenAIConnection")
- * .addInitialArgument("model", "gpt-4o-mini")
+ * public static ResourceDesc openAIResponses() {
+ * return
ResourceDescriptor.Builder.newBuilder(OpenAIResponsesModelSetup.class.getName())
+ * .addInitialArgument("connection", "myOpenAIResponsesConnection")
+ * .addInitialArgument("model", "gpt-4o")
* .addInitialArgument("temperature", 0.3d)
- * .addInitialArgument("max_tokens", 500)
+ * .addInitialArgument("max_tokens", 2048)
* .addInitialArgument("strict", true)
* .addInitialArgument("reasoning_effort", "medium")
- * .addInitialArgument("tools", List.of("convertTemperature",
"calculateBMI"))
- * .addInitialArgument(
- * "additional_kwargs",
- * Map.of("seed", 42, "user", "user-123"))
+ * .addInitialArgument("store", true)
+ * .addInitialArgument("tools", List.of("getWeather", "searchDB"))
* .build();
* }
* }
* }</pre>
*/
-public class OpenAIChatModelSetup extends BaseChatModelSetup {
+public class OpenAIResponsesModelSetup extends BaseChatModelSetup {
- private static final String DEFAULT_MODEL = "gpt-3.5-turbo";
+ private static final String DEFAULT_MODEL = "gpt-4o";
private static final double DEFAULT_TEMPERATURE = 0.1d;
- private static final int DEFAULT_TOP_LOGPROBS = 0;
private static final boolean DEFAULT_STRICT = false;
+ private static final boolean DEFAULT_STORE = false;
private static final Set<String> VALID_REASONING_EFFORTS = Set.of("low",
"medium", "high");
private final Double temperature;
private final Integer maxTokens;
- private final Boolean logprobs;
- private final Integer topLogprobs;
private final Boolean strict;
private final String reasoningEffort;
+ private final Boolean store;
+ private final String instructions;
private final Map<String, Object> additionalArguments;
- public OpenAIChatModelSetup(
+ public OpenAIResponsesModelSetup(
ResourceDescriptor descriptor, BiFunction<String, ResourceType,
Resource> getResource) {
super(descriptor, getResource);
+
this.temperature =
Optional.ofNullable(descriptor.<Number>getArgument("temperature"))
.map(Number::doubleValue)
@@ -93,16 +92,6 @@ public class OpenAIChatModelSetup extends BaseChatModelSetup
{
throw new IllegalArgumentException("max_tokens must be greater
than 0");
}
- this.logprobs = descriptor.getArgument("logprobs");
-
- this.topLogprobs =
-
Optional.ofNullable(descriptor.<Number>getArgument("top_logprobs"))
- .map(Number::intValue)
- .orElse(DEFAULT_TOP_LOGPROBS);
- if (this.topLogprobs < 0 || this.topLogprobs > 20) {
- throw new IllegalArgumentException("top_logprobs must be between 0
and 20");
- }
-
this.strict =
Optional.ofNullable(descriptor.<Boolean>getArgument("strict"))
.orElse(DEFAULT_STRICT);
@@ -114,26 +103,30 @@ public class OpenAIChatModelSetup extends
BaseChatModelSetup {
"reasoning_effort must be one of: low, medium, high");
}
- Map<String, Object> additional =
+ this.store =
+
Optional.ofNullable(descriptor.<Boolean>getArgument("store")).orElse(DEFAULT_STORE);
+
+ this.instructions = descriptor.getArgument("instructions");
+
+ this.additionalArguments =
Optional.ofNullable(
descriptor.<Map<String,
Object>>getArgument("additional_kwargs"))
.map(HashMap::new)
.orElseGet(HashMap::new);
- this.additionalArguments = additional;
if (this.model == null || this.model.isBlank()) {
this.model = DEFAULT_MODEL;
}
}
- public OpenAIChatModelSetup(
+ public OpenAIResponsesModelSetup(
String model,
double temperature,
Integer maxTokens,
- Boolean logprobs,
- Integer topLogprobs,
Boolean strict,
String reasoningEffort,
+ Boolean store,
+ String instructions,
Map<String, Object> additionalArguments,
List<String> tools,
BiFunction<String, ResourceType, Resource> getResource) {
@@ -142,10 +135,10 @@ public class OpenAIChatModelSetup extends
BaseChatModelSetup {
model,
temperature,
maxTokens,
- logprobs,
- topLogprobs,
strict,
reasoningEffort,
+ store,
+ instructions,
additionalArguments,
tools),
getResource);
@@ -161,16 +154,18 @@ public class OpenAIChatModelSetup extends
BaseChatModelSetup {
if (maxTokens != null) {
parameters.put("max_tokens", maxTokens);
}
- if (Boolean.TRUE.equals(logprobs)) {
- parameters.put("logprobs", logprobs);
- parameters.put("top_logprobs", topLogprobs);
- }
if (strict) {
parameters.put("strict", strict);
}
if (reasoningEffort != null) {
parameters.put("reasoning_effort", reasoningEffort);
}
+ if (store) {
+ parameters.put("store", store);
+ }
+ if (instructions != null) {
+ parameters.put("instructions", instructions);
+ }
if (additionalArguments != null && !additionalArguments.isEmpty()) {
parameters.put("additional_kwargs", additionalArguments);
}
@@ -181,32 +176,32 @@ public class OpenAIChatModelSetup extends
BaseChatModelSetup {
String model,
double temperature,
Integer maxTokens,
- Boolean logprobs,
- Integer topLogprobs,
Boolean strict,
String reasoningEffort,
+ Boolean store,
+ String instructions,
Map<String, Object> additionalArguments,
List<String> tools) {
ResourceDescriptor.Builder builder =
-
ResourceDescriptor.Builder.newBuilder(OpenAIChatModelSetup.class.getName())
+
ResourceDescriptor.Builder.newBuilder(OpenAIResponsesModelSetup.class.getName())
.addInitialArgument("model", model)
.addInitialArgument("temperature", temperature);
if (maxTokens != null) {
builder.addInitialArgument("max_tokens", maxTokens);
}
- if (logprobs != null) {
- builder.addInitialArgument("logprobs", logprobs);
- }
- if (topLogprobs != null) {
- builder.addInitialArgument("top_logprobs", topLogprobs);
- }
if (strict != null) {
builder.addInitialArgument("strict", strict);
}
if (reasoningEffort != null) {
builder.addInitialArgument("reasoning_effort", reasoningEffort);
}
+ if (store != null) {
+ builder.addInitialArgument("store", store);
+ }
+ if (instructions != null) {
+ builder.addInitialArgument("instructions", instructions);
+ }
if (additionalArguments != null && !additionalArguments.isEmpty()) {
builder.addInitialArgument("additional_kwargs",
additionalArguments);
}
diff --git a/python/flink_agents/api/resource.py
b/python/flink_agents/api/resource.py
index 76ebf937..a5b1dd86 100644
--- a/python/flink_agents/api/resource.py
+++ b/python/flink_agents/api/resource.py
@@ -271,9 +271,12 @@ class ResourceName:
OLLAMA_CONNECTION =
"org.apache.flink.agents.integrations.chatmodels.ollama.OllamaChatModelConnection"
OLLAMA_SETUP =
"org.apache.flink.agents.integrations.chatmodels.ollama.OllamaChatModelSetup"
- # OpenAI
- OPENAI_CONNECTION =
"org.apache.flink.agents.integrations.chatmodels.openai.OpenAIChatModelConnection"
- OPENAI_SETUP =
"org.apache.flink.agents.integrations.chatmodels.openai.OpenAIChatModelSetup"
+ # OpenAI Completions
+ OPENAI_COMPLETIONS_CONNECTION =
"org.apache.flink.agents.integrations.chatmodels.openai.OpenAICompletionsConnection"
+ OPENAI_COMPLETIONS_SETUP =
"org.apache.flink.agents.integrations.chatmodels.openai.OpenAICompletionsSetup"
+
+ OPENAI_RESPONSES_CONNECTION =
"org.apache.flink.agents.integrations.chatmodels.openai.OpenAIResponsesModelConnection"
+ OPENAI_RESPONSES_SETUP =
"org.apache.flink.agents.integrations.chatmodels.openai.OpenAIResponsesModelSetup"
class EmbeddingModel:
"""EmbeddingModel resource names."""