This is an automated email from the ASF dual-hosted git repository.
xtsong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-agents.git
The following commit(s) were added to refs/heads/main by this push:
new ef22016 [Feature][Java] Add Anthropic chat model integration (#369)
ef22016 is described below
commit ef2201699427c98c7d560ec1d1368cf2c8286f2d
Author: Xiang Li <[email protected]>
AuthorDate: Sun Dec 7 18:19:16 2025 -0800
[Feature][Java] Add Anthropic chat model integration (#369)
---
.../pom.xml | 5 +
.../test/ChatModelIntegrationAgent.java | 17 +
.../integration/test/ChatModelIntegrationTest.java | 2 +-
integrations/{ => chat-models/anthropic}/pom.xml | 34 +-
.../anthropic/AnthropicChatModelConnection.java | 509 +++++++++++++++++++++
.../anthropic/AnthropicChatModelSetup.java | 175 +++++++
integrations/chat-models/pom.xml | 1 +
integrations/pom.xml | 1 +
8 files changed, 731 insertions(+), 13 deletions(-)
diff --git a/e2e-test/flink-agents-end-to-end-tests-integration/pom.xml
b/e2e-test/flink-agents-end-to-end-tests-integration/pom.xml
index 65c7d12..1a57cf0 100644
--- a/e2e-test/flink-agents-end-to-end-tests-integration/pom.xml
+++ b/e2e-test/flink-agents-end-to-end-tests-integration/pom.xml
@@ -59,6 +59,11 @@ under the License.
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+
<artifactId>flink-agents-integrations-chat-models-anthropic</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-agents-integrations-chat-models-azureai</artifactId>
diff --git
a/e2e-test/flink-agents-end-to-end-tests-integration/src/test/java/org/apache/flink/agents/integration/test/ChatModelIntegrationAgent.java
b/e2e-test/flink-agents-end-to-end-tests-integration/src/test/java/org/apache/flink/agents/integration/test/ChatModelIntegrationAgent.java
index 9056f99..6ce40a5 100644
---
a/e2e-test/flink-agents-end-to-end-tests-integration/src/test/java/org/apache/flink/agents/integration/test/ChatModelIntegrationAgent.java
+++
b/e2e-test/flink-agents-end-to-end-tests-integration/src/test/java/org/apache/flink/agents/integration/test/ChatModelIntegrationAgent.java
@@ -33,6 +33,8 @@ import org.apache.flink.agents.api.event.ChatRequestEvent;
import org.apache.flink.agents.api.event.ChatResponseEvent;
import org.apache.flink.agents.api.resource.ResourceDescriptor;
import org.apache.flink.agents.api.resource.ResourceType;
+import
org.apache.flink.agents.integrations.chatmodels.anthropic.AnthropicChatModelConnection;
+import
org.apache.flink.agents.integrations.chatmodels.anthropic.AnthropicChatModelSetup;
import
org.apache.flink.agents.integrations.chatmodels.azureai.AzureAIChatModelConnection;
import
org.apache.flink.agents.integrations.chatmodels.azureai.AzureAIChatModelSetup;
import
org.apache.flink.agents.integrations.chatmodels.ollama.OllamaChatModelConnection;
@@ -87,6 +89,13 @@ public class ChatModelIntegrationAgent extends Agent {
return
ResourceDescriptor.Builder.newBuilder(OpenAIChatModelConnection.class.getName())
.addInitialArgument("api_key", apiKey)
.build();
+ } else if (provider.equals("ANTHROPIC")) {
+ String apiKey = System.getenv().get("ANTHROPIC_API_KEY");
+ return ResourceDescriptor.Builder.newBuilder(
+ AnthropicChatModelConnection.class.getName())
+ .addInitialArgument("api_key", apiKey)
+ .addInitialArgument("timeout", 240)
+ .build();
} else {
throw new RuntimeException(String.format("Unknown model provider
%s", provider));
}
@@ -112,6 +121,14 @@ public class ChatModelIntegrationAgent extends Agent {
"tools",
List.of("calculateBMI", "convertTemperature",
"createRandomNumber"))
.build();
+ } else if (provider.equals("ANTHROPIC")) {
+ return
ResourceDescriptor.Builder.newBuilder(AnthropicChatModelSetup.class.getName())
+ .addInitialArgument("connection", "chatModelConnection")
+ .addInitialArgument("model", "claude-sonnet-4-20250514")
+ .addInitialArgument(
+ "tools",
+ List.of("calculateBMI", "convertTemperature",
"createRandomNumber"))
+ .build();
} else if (provider.equals("OPENAI")) {
return
ResourceDescriptor.Builder.newBuilder(OpenAIChatModelSetup.class.getName())
.addInitialArgument("connection", "chatModelConnection")
diff --git
a/e2e-test/flink-agents-end-to-end-tests-integration/src/test/java/org/apache/flink/agents/integration/test/ChatModelIntegrationTest.java
b/e2e-test/flink-agents-end-to-end-tests-integration/src/test/java/org/apache/flink/agents/integration/test/ChatModelIntegrationTest.java
index d1387b2..af2bf8c 100644
---
a/e2e-test/flink-agents-end-to-end-tests-integration/src/test/java/org/apache/flink/agents/integration/test/ChatModelIntegrationTest.java
+++
b/e2e-test/flink-agents-end-to-end-tests-integration/src/test/java/org/apache/flink/agents/integration/test/ChatModelIntegrationTest.java
@@ -52,7 +52,7 @@ public class ChatModelIntegrationTest extends
OllamaPreparationUtils {
}
@ParameterizedTest()
- @ValueSource(strings = {"OLLAMA", "AZURE", "OPENAI"})
+ @ValueSource(strings = {"ANTHROPIC", "AZURE", "OLLAMA", "OPENAI"})
public void testChatModeIntegration(String provider) throws Exception {
Assumptions.assumeTrue(
(OLLAMA.equals(provider) && ollamaReady)
diff --git a/integrations/pom.xml b/integrations/chat-models/anthropic/pom.xml
similarity index 56%
copy from integrations/pom.xml
copy to integrations/chat-models/anthropic/pom.xml
index dd21927..e59795f 100644
--- a/integrations/pom.xml
+++ b/integrations/chat-models/anthropic/pom.xml
@@ -22,22 +22,32 @@ under the License.
<parent>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-agents</artifactId>
+ <artifactId>flink-agents-integrations-chat-models</artifactId>
<version>0.2-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
</parent>
- <artifactId>flink-agents-integrations</artifactId>
- <name>Flink Agents : Integrations:</name>
- <packaging>pom</packaging>
+ <artifactId>flink-agents-integrations-chat-models-anthropic</artifactId>
+ <name>Flink Agents : Integrations: Chat Models: Anthropic</name>
+ <packaging>jar</packaging>
- <properties>
- <ollama4j.version>1.1.5</ollama4j.version>
- <openai.version>4.8.0</openai.version>
- </properties>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-agents-api</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-agents-plan</artifactId>
+ <version>${project.version}</version>
+ </dependency>
- <modules>
- <module>chat-models</module>
- <module>embedding-models</module>
- </modules>
+ <dependency>
+ <groupId>com.anthropic</groupId>
+ <artifactId>anthropic-java</artifactId>
+ <version>${anthropic.version}</version>
+ </dependency>
+ </dependencies>
</project>
diff --git
a/integrations/chat-models/anthropic/src/main/java/org/apache/flink/agents/integrations/chatmodels/anthropic/AnthropicChatModelConnection.java
b/integrations/chat-models/anthropic/src/main/java/org/apache/flink/agents/integrations/chatmodels/anthropic/AnthropicChatModelConnection.java
new file mode 100644
index 0000000..ab30c97
--- /dev/null
+++
b/integrations/chat-models/anthropic/src/main/java/org/apache/flink/agents/integrations/chatmodels/anthropic/AnthropicChatModelConnection.java
@@ -0,0 +1,509 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.integrations.chatmodels.anthropic;
+
+import com.anthropic.client.AnthropicClient;
+import com.anthropic.client.okhttp.AnthropicOkHttpClient;
+import com.anthropic.core.JsonValue;
+import com.anthropic.models.messages.ContentBlock;
+import com.anthropic.models.messages.ContentBlockParam;
+import com.anthropic.models.messages.Message;
+import com.anthropic.models.messages.MessageCreateParams;
+import com.anthropic.models.messages.MessageParam;
+import com.anthropic.models.messages.Model;
+import com.anthropic.models.messages.TextBlockParam;
+import com.anthropic.models.messages.Tool;
+import com.anthropic.models.messages.ToolResultBlockParam;
+import com.anthropic.models.messages.ToolUseBlockParam;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.agents.api.chat.messages.ChatMessage;
+import org.apache.flink.agents.api.chat.messages.MessageRole;
+import org.apache.flink.agents.api.chat.model.BaseChatModelConnection;
+import org.apache.flink.agents.api.resource.Resource;
+import org.apache.flink.agents.api.resource.ResourceDescriptor;
+import org.apache.flink.agents.api.resource.ResourceType;
+import org.apache.flink.agents.api.tools.ToolMetadata;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+
+/**
+ * A chat model integration for the Anthropic Chat service using the official
Java SDK.
+ *
+ * <p>Supported connection parameters:
+ *
+ * <ul>
+ * <li><b>api_key</b> (required): Anthropic API key
+ * <li><b>timeout</b> (optional): Timeout in seconds for API requests
+ * <li><b>max_retries</b> (optional): Maximum number of retry attempts
(default: 2)
+ * </ul>
+ *
+ * <p>Example usage:
+ *
+ * <pre>{@code
+ * public class MyAgent extends Agent {
+ * @ChatModelConnection
+ * public static ResourceDesc anthropic() {
+ * return
ResourceDescriptor.Builder.newBuilder(AnthropicChatModelConnection.class.getName())
+ * .addInitialArgument("api_key",
System.getenv("ANTHROPIC_API_KEY"))
+ * .addInitialArgument("timeout", 120)
+ * .addInitialArgument("max_retries", 3)
+ * .build();
+ * }
+ * }
+ * }</pre>
+ */
+public class AnthropicChatModelConnection extends BaseChatModelConnection {
+
+ private static final TypeReference<Map<String, Object>> MAP_TYPE = new
TypeReference<>() {};
+
+ private final ObjectMapper mapper = new ObjectMapper();
+ private final AnthropicClient client;
+ private final String defaultModel;
+
+ public AnthropicChatModelConnection(
+ ResourceDescriptor descriptor, BiFunction<String, ResourceType,
Resource> getResource) {
+ super(descriptor, getResource);
+
+ String apiKey = descriptor.getArgument("api_key");
+ if (apiKey == null || apiKey.isBlank()) {
+ throw new IllegalArgumentException("api_key should not be null or
empty.");
+ }
+
+ AnthropicOkHttpClient.Builder builder =
AnthropicOkHttpClient.builder().apiKey(apiKey);
+
+ Integer timeoutSeconds = descriptor.getArgument("timeout");
+ if (timeoutSeconds != null && timeoutSeconds > 0) {
+ builder.timeout(Duration.ofSeconds(timeoutSeconds));
+ }
+
+ Integer maxRetries = descriptor.getArgument("max_retries");
+ if (maxRetries != null && maxRetries >= 0) {
+ builder.maxRetries(maxRetries);
+ }
+
+ this.defaultModel = descriptor.getArgument("model");
+ this.client = builder.build();
+ }
+
+ @Override
+ public ChatMessage chat(
+ List<ChatMessage> messages,
+ List<org.apache.flink.agents.api.tools.Tool> tools,
+ Map<String, Object> arguments) {
+ try {
+ // Check if JSON prefill is requested before building request
(arguments may be
+ // modified).
+ boolean jsonPrefillRequested =
+ arguments != null &&
Boolean.TRUE.equals(arguments.get("json_prefill"));
+ // JSON prefill is automatically disabled when tools are passed in
the request,
+ // because it interferes with native tool calling.
+ boolean hasToolsInRequest = tools != null && !tools.isEmpty();
+ boolean jsonPrefillApplied = jsonPrefillRequested &&
!hasToolsInRequest;
+
+ MessageCreateParams params = buildRequest(messages, tools,
arguments);
+ Message response = client.messages().create(params);
+ return convertResponse(response, jsonPrefillApplied);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to call Anthropic messages
API.", e);
+ }
+ }
+
+ private MessageCreateParams buildRequest(
+ List<ChatMessage> messages,
+ List<org.apache.flink.agents.api.tools.Tool> tools,
+ Map<String, Object> rawArguments) {
+ Map<String, Object> arguments =
+ rawArguments != null ? new HashMap<>(rawArguments) : new
HashMap<>();
+
+ Object modelObj = arguments.remove("model");
+ String modelName = modelObj != null ? modelObj.toString() :
this.defaultModel;
+ if (modelName == null || modelName.isBlank()) {
+ modelName = this.defaultModel;
+ }
+
+ List<TextBlockParam> systemBlocks = extractSystemMessages(messages);
+
+ List<MessageParam> anthropicMessages =
+ messages.stream()
+ .filter(m -> m.getRole() != MessageRole.SYSTEM)
+ .map(this::convertToAnthropicMessage)
+ .collect(Collectors.toList());
+
+ MessageCreateParams.Builder builder =
+ MessageCreateParams.builder()
+ .model(Model.of(modelName))
+ .messages(anthropicMessages);
+
+ if (!systemBlocks.isEmpty()) {
+ builder.systemOfTextBlockParams(systemBlocks);
+ }
+
+ // Handle strict tools - enables structured outputs for tool use
+ Object strictTools = arguments.remove("strict_tools");
+ boolean strictToolsEnabled = Boolean.TRUE.equals(strictTools);
+
+ if (tools != null && !tools.isEmpty()) {
+ for (Tool tool : convertTools(tools, strictToolsEnabled)) {
+ builder.addTool(tool);
+ }
+ }
+
+ // Add beta header for strict tool use
+ //
https://platform.claude.com/docs/en/build-with-claude/structured-outputs#strict-tool-use
+ if (strictToolsEnabled) {
+ builder.putAdditionalHeader("anthropic-beta",
"structured-outputs-2025-11-13");
+ }
+
+ Object maxTokens = arguments.remove("max_tokens");
+ if (maxTokens instanceof Number) {
+ builder.maxTokens(((Number) maxTokens).longValue());
+ }
+
+ Object temperature = arguments.remove("temperature");
+ if (temperature instanceof Number) {
+ builder.temperature(((Number) temperature).doubleValue());
+ }
+
+ @SuppressWarnings("unchecked")
+ Map<String, Object> additionalKwargs =
+ (Map<String, Object>) arguments.remove("additional_kwargs");
+ if (additionalKwargs != null) {
+ applyAdditionalKwargs(builder, additionalKwargs);
+ }
+
+ // Handle JSON prefill - append a prefilled assistant message with "{"
to enforce JSON
+ // output. Note: JSON prefill is incompatible with tool use as it
forces the model to output
+ // JSON text instead of using native tool_use content blocks.
Automatically disable
+ // json_prefill when tools are actually passed in the request.
+ Object jsonPrefill = arguments.remove("json_prefill");
+ boolean hasToolsInRequest = tools != null && !tools.isEmpty();
+ if (Boolean.TRUE.equals(jsonPrefill) && !hasToolsInRequest) {
+ anthropicMessages.add(
+
MessageParam.builder().role(MessageParam.Role.ASSISTANT).content("{").build());
+ builder.messages(anthropicMessages);
+ }
+
+ return builder.build();
+ }
+
+ private List<TextBlockParam> extractSystemMessages(List<ChatMessage>
messages) {
+ return messages.stream()
+ .filter(m -> m.getRole() == MessageRole.SYSTEM)
+ .map(m ->
TextBlockParam.builder().text(m.getContent()).build())
+ .collect(Collectors.toList());
+ }
+
+ private MessageParam convertToAnthropicMessage(ChatMessage message) {
+ MessageRole role = message.getRole();
+ String content = Optional.ofNullable(message.getContent()).orElse("");
+
+ switch (role) {
+ case USER:
+ return
MessageParam.builder().role(MessageParam.Role.USER).content(content).build();
+
+ case ASSISTANT:
+ List<Map<String, Object>> toolCalls = message.getToolCalls();
+ if (toolCalls != null && !toolCalls.isEmpty()) {
+ List<ContentBlockParam> contentBlocks = new ArrayList<>();
+ if (!content.isEmpty()) {
+ contentBlocks.add(
+ ContentBlockParam.ofText(
+
TextBlockParam.builder().text(content).build()));
+ }
+ contentBlocks.addAll(convertToolCallsToToolUse(toolCalls));
+ return MessageParam.builder()
+ .role(MessageParam.Role.ASSISTANT)
+ .contentOfBlockParams(contentBlocks)
+ .build();
+ } else {
+ return MessageParam.builder()
+ .role(MessageParam.Role.ASSISTANT)
+ .content(content)
+ .build();
+ }
+
+ case TOOL:
+ Object toolCallId = message.getExtraArgs().get("externalId");
+ if (toolCallId == null) {
+ throw new IllegalArgumentException(
+ "Tool message must have an externalId in
extraArgs.");
+ }
+ ToolResultBlockParam toolResult =
+ ToolResultBlockParam.builder()
+ .toolUseId(toolCallId.toString())
+ .content(content)
+ .build();
+ return MessageParam.builder()
+ .role(MessageParam.Role.USER)
+
.contentOfBlockParams(List.of(ContentBlockParam.ofToolResult(toolResult)))
+ .build();
+
+ default:
+ throw new IllegalArgumentException("Unsupported role: " +
role);
+ }
+ }
+
+ private List<ContentBlockParam> convertToolCallsToToolUse(List<Map<String,
Object>> toolCalls) {
+ List<ContentBlockParam> blocks = new ArrayList<>();
+ for (Map<String, Object> call : toolCalls) {
+ Object type = call.getOrDefault("type", "function");
+ if (!"function".equals(String.valueOf(type))) {
+ continue;
+ }
+
+ Map<String, Object> functionPayload = toMap(call.get("function"));
+ String functionName = String.valueOf(functionPayload.get("name"));
+ Object arguments = functionPayload.get("arguments");
+ Map<String, Object> inputMap = toMap(arguments);
+
+ Object originalIdObj = call.get("original_id");
+ if (originalIdObj == null) {
+ throw new IllegalArgumentException(
+ "Tool call must have an original_id for Anthropic.");
+ }
+
+ ToolUseBlockParam toolUse =
+ ToolUseBlockParam.builder()
+ .id(originalIdObj.toString())
+ .name(functionName)
+ .input(toJsonValue(inputMap))
+ .build();
+
+ blocks.add(ContentBlockParam.ofToolUse(toolUse));
+ }
+ return blocks;
+ }
+
+ private List<Tool> convertTools(
+ List<org.apache.flink.agents.api.tools.Tool> tools, boolean
strictToolsEnabled) {
+ List<Tool> anthropicTools = new ArrayList<>(tools.size());
+ for (org.apache.flink.agents.api.tools.Tool tool : tools) {
+ ToolMetadata metadata = tool.getMetadata();
+ Tool.Builder toolBuilder =
+
Tool.builder().name(metadata.getName()).description(metadata.getDescription());
+
+ String schema = metadata.getInputSchema();
+ if (schema != null && !schema.isBlank()) {
+ toolBuilder.inputSchema(parseToolInputSchema(schema));
+ }
+
+ if (strictToolsEnabled) {
+ toolBuilder.putAdditionalProperty("strict",
JsonValue.from(true));
+ }
+
+ anthropicTools.add(toolBuilder.build());
+ }
+ return anthropicTools;
+ }
+
+ private Tool.InputSchema parseToolInputSchema(String schemaJson) {
+ try {
+ JsonNode root = mapper.readTree(schemaJson);
+ if (root == null || !root.isObject()) {
+ return Tool.InputSchema.builder().build();
+ }
+
+ Tool.InputSchema.Builder builder = Tool.InputSchema.builder();
+ root.fields()
+ .forEachRemaining(
+ entry ->
+ builder.putAdditionalProperty(
+ entry.getKey(),
+
JsonValue.fromJsonNode(entry.getValue())));
+
+ return builder.build();
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException("Failed to parse tool schema JSON.", e);
+ }
+ }
+
+ private ChatMessage convertResponse(Message response, boolean
jsonPrefillApplied) {
+ List<ContentBlock> contentBlocks = response.content();
+ if (contentBlocks.isEmpty()) {
+ throw new IllegalStateException("Anthropic response did not
contain any content.");
+ }
+
+ StringBuilder textContent = new StringBuilder();
+ // If JSON prefill was used, prepend "{" since the response only
contains the continuation
+ if (jsonPrefillApplied) {
+ textContent.append("{");
+ }
+ List<Map<String, Object>> toolCalls = new ArrayList<>();
+
+ for (ContentBlock block : contentBlocks) {
+ if (block.isText()) {
+ block.text()
+ .ifPresent(
+ textBlock -> {
+ textContent.append(textBlock.text());
+ });
+ } else if (block.isToolUse()) {
+ block.toolUse()
+ .ifPresent(
+ toolUse -> {
+ String toolUseId = toolUse.id();
+ Map<String, Object> toolCall = new
LinkedHashMap<>();
+ toolCall.put("id", toolUseId);
+ toolCall.put("type", "function");
+
+ Map<String, Object> functionMap = new
LinkedHashMap<>();
+ functionMap.put("name", toolUse.name());
+ JsonValue inputValue = toolUse._input();
+ Map<String, Object> inputMap =
jsonValueToMap(inputValue);
+ functionMap.put("arguments", inputMap);
+ toolCall.put("function", functionMap);
+ toolCall.put("original_id", toolUseId);
+
+ toolCalls.add(toolCall);
+ });
+ }
+ }
+
+ String finalText = textContent.toString();
+
+ // If the response has no tool calls, try to extract JSON from
markdown code blocks.
+ if (toolCalls.isEmpty()) {
+ finalText = extractJsonFromMarkdown(finalText);
+ }
+
+ ChatMessage chatMessage = ChatMessage.assistant(finalText);
+ if (!toolCalls.isEmpty()) {
+ chatMessage.setToolCalls(toolCalls);
+ }
+
+ return chatMessage;
+ }
+
+ /**
+ * Extracts JSON content from a string that may contain markdown code
blocks.
+ *
+ * <p>Claude often wraps JSON responses in markdown code blocks like
{@code ```json ... ```},
+ * especially when tools are configured (since json_prefill is disabled).
This method extracts
+ * the JSON content from such responses. If no code block is found, the
original content is
+ * returned unchanged.
+ *
+ * @param content The response content that may contain markdown-wrapped
JSON
+ * @return The extracted JSON string, or the original content if no code
block is found
+ */
+ private String extractJsonFromMarkdown(String content) {
+ if (content == null) {
+ return null;
+ }
+
+ String trimmed = content.trim();
+
+ // Try to find JSON in markdown code block (```json ... ``` or ``` ...
```)
+ int jsonBlockStart = trimmed.indexOf("```json");
+ int genericBlockStart = trimmed.indexOf("```");
+
+ int contentStart;
+
+ if (jsonBlockStart != -1) {
+ contentStart = jsonBlockStart + 7; // length of "```json"
+ } else if (genericBlockStart != -1) {
+ contentStart = genericBlockStart + 3; // length of "```"
+ } else {
+ return content;
+ }
+
+ // Find the closing ```
+ int blockEnd = trimmed.indexOf("```", contentStart);
+ if (blockEnd == -1) {
+ return content;
+ }
+
+ // Extract content between the markers
+ return trimmed.substring(contentStart, blockEnd).trim();
+ }
+
+ private void applyAdditionalKwargs(
+ MessageCreateParams.Builder builder, Map<String, Object> kwargs) {
+ for (Map.Entry<String, Object> entry : kwargs.entrySet()) {
+ String key = entry.getKey();
+ Object value = entry.getValue();
+
+ switch (key) {
+ case "top_k":
+ if (value instanceof Number) {
+ builder.topK(((Number) value).longValue());
+ }
+ break;
+ case "top_p":
+ if (value instanceof Number) {
+ builder.topP(((Number) value).doubleValue());
+ }
+ break;
+ case "stop_sequences":
+ if (value instanceof List) {
+ @SuppressWarnings("unchecked")
+ List<String> stopSequences = (List<String>) value;
+ builder.stopSequences(stopSequences);
+ }
+ break;
+ default:
+ builder.putAdditionalBodyProperty(key, toJsonValue(value));
+ break;
+ }
+ }
+ }
+
+ private Map<String, Object> toMap(Object value) {
+ if (value instanceof Map) {
+ @SuppressWarnings("unchecked")
+ Map<String, Object> casted = (Map<String, Object>) value;
+ return new LinkedHashMap<>(casted);
+ }
+ if (value == null) {
+ return new LinkedHashMap<>();
+ }
+ return mapper.convertValue(value, MAP_TYPE);
+ }
+
+ private JsonValue toJsonValue(Object value) {
+ if (value instanceof JsonValue) {
+ return (JsonValue) value;
+ }
+ if (value instanceof String
+ || value instanceof Number
+ || value instanceof Boolean
+ || value == null) {
+ return JsonValue.from(value);
+ }
+ return JsonValue.fromJsonNode(mapper.valueToTree(value));
+ }
+
+ private Map<String, Object> jsonValueToMap(JsonValue jsonValue) {
+ try {
+ String jsonString = mapper.writeValueAsString(jsonValue);
+ return mapper.readValue(jsonString, MAP_TYPE);
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException("Failed to convert JsonValue to Map.",
e);
+ }
+ }
+}
diff --git
a/integrations/chat-models/anthropic/src/main/java/org/apache/flink/agents/integrations/chatmodels/anthropic/AnthropicChatModelSetup.java
b/integrations/chat-models/anthropic/src/main/java/org/apache/flink/agents/integrations/chatmodels/anthropic/AnthropicChatModelSetup.java
new file mode 100644
index 0000000..145213c
--- /dev/null
+++
b/integrations/chat-models/anthropic/src/main/java/org/apache/flink/agents/integrations/chatmodels/anthropic/AnthropicChatModelSetup.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.integrations.chatmodels.anthropic;
+
+import org.apache.flink.agents.api.chat.model.BaseChatModelSetup;
+import org.apache.flink.agents.api.resource.Resource;
+import org.apache.flink.agents.api.resource.ResourceDescriptor;
+import org.apache.flink.agents.api.resource.ResourceType;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.BiFunction;
+
+/**
+ * Chat model setup for the Anthropic Messages API.
+ *
+ * <p>Responsible for providing per-chat configuration such as model,
temperature, max_tokens, tool
+ * bindings, and additional Anthropic parameters. The setup delegates
execution to {@link
+ * AnthropicChatModelConnection}.
+ *
+ * <p>Supported parameters:
+ *
+ * <ul>
+ * <li><b>connection</b> (required): Name of the
AnthropicChatModelConnection resource
+ * <li><b>model</b> (optional): Model name (default:
claude-sonnet-4-20250514)
+ * <li><b>temperature</b> (optional): Sampling temperature 0.0-1.0 (default:
0.1)
+ * <li><b>max_tokens</b> (optional): Maximum tokens in response (default:
1024)
+ * <li><b>json_prefill</b> (optional): When true, prefills assistant
response with "{" to enforce
+ * JSON output. Automatically disabled when tools are passed. (default:
true)
+ * <li><b>strict_tools</b> (optional): When true, tool calls adhere strictly
to the JSON schema.
+ * (default: false)
+ * <li><b>tools</b> (optional): List of tool names available for the model
to use
+ * <li><b>additional_kwargs</b> (optional): Additional parameters (top_k,
top_p, stop_sequences)
+ * </ul>
+ *
+ * <p>Example usage:
+ *
+ * <pre>{@code
+ * public class MyAgent extends Agent {
+ * @ChatModelSetup
+ * public static ResourceDesc anthropic() {
+ * return
ResourceDescriptor.Builder.newBuilder(AnthropicChatModelSetup.class.getName())
+ * .addInitialArgument("connection", "myAnthropicConnection")
+ * .addInitialArgument("model", "claude-sonnet-4-20250514")
+ * .addInitialArgument("temperature", 0.3d)
+ * .addInitialArgument("max_tokens", 2048)
+ * .addInitialArgument("strict_tools", true)
+ * .addInitialArgument("tools", List.of("convertTemperature",
"calculateBMI"))
+ * .addInitialArgument(
+ * "additional_kwargs",
+ * Map.of("top_k", 40, "top_p", 0.9))
+ * .build();
+ * }
+ * }
+ * }</pre>
+ */
+public class AnthropicChatModelSetup extends BaseChatModelSetup {
+
+ private static final String DEFAULT_MODEL = "claude-sonnet-4-20250514";
+ private static final double DEFAULT_TEMPERATURE = 0.1d;
+ private static final long DEFAULT_MAX_TOKENS = 1024L;
+ private static final boolean DEFAULT_JSON_PREFILL = true;
+ private static final boolean DEFAULT_STRICT_TOOLS = false;
+
+ private final Double temperature;
+ private final Long maxTokens;
+ private final Boolean jsonPrefill;
+ private final Boolean strictTools;
+ private final Map<String, Object> additionalArguments;
+
+ public AnthropicChatModelSetup(
+ ResourceDescriptor descriptor, BiFunction<String, ResourceType,
Resource> getResource) {
+ super(descriptor, getResource);
+ this.temperature =
+
Optional.ofNullable(descriptor.<Number>getArgument("temperature"))
+ .map(Number::doubleValue)
+ .orElse(DEFAULT_TEMPERATURE);
+ if (this.temperature < 0.0 || this.temperature > 1.0) {
+ throw new IllegalArgumentException("temperature must be between
0.0 and 1.0");
+ }
+
+ this.maxTokens =
+
Optional.ofNullable(descriptor.<Number>getArgument("max_tokens"))
+ .map(Number::longValue)
+ .orElse(DEFAULT_MAX_TOKENS);
+ if (this.maxTokens <= 0) {
+ throw new IllegalArgumentException("max_tokens must be greater
than 0");
+ }
+
+ this.jsonPrefill =
+
Optional.ofNullable(descriptor.<Boolean>getArgument("json_prefill"))
+ .orElse(DEFAULT_JSON_PREFILL);
+
+ this.strictTools =
+
Optional.ofNullable(descriptor.<Boolean>getArgument("strict_tools"))
+ .orElse(DEFAULT_STRICT_TOOLS);
+
+ this.additionalArguments =
+ Optional.ofNullable(
+ descriptor.<Map<String,
Object>>getArgument("additional_kwargs"))
+ .map(HashMap::new)
+ .orElseGet(HashMap::new);
+
+ if (this.model == null || this.model.isBlank()) {
+ this.model = DEFAULT_MODEL;
+ }
+ }
+
+ public AnthropicChatModelSetup(
+ String model,
+ double temperature,
+ long maxTokens,
+ Map<String, Object> additionalArguments,
+ List<String> tools,
+ BiFunction<String, ResourceType, Resource> getResource) {
+ this(
+ createDescriptor(model, temperature, maxTokens,
additionalArguments, tools),
+ getResource);
+ }
+
+ @Override
+ public Map<String, Object> getParameters() {
+ Map<String, Object> parameters = new HashMap<>();
+ if (model != null) {
+ parameters.put("model", model);
+ }
+ parameters.put("temperature", temperature);
+ parameters.put("max_tokens", maxTokens);
+ parameters.put("json_prefill", jsonPrefill);
+ parameters.put("strict_tools", strictTools);
+ if (additionalArguments != null && !additionalArguments.isEmpty()) {
+ parameters.put("additional_kwargs", additionalArguments);
+ }
+ return parameters;
+ }
+
+ private static ResourceDescriptor createDescriptor(
+ String model,
+ double temperature,
+ long maxTokens,
+ Map<String, Object> additionalArguments,
+ List<String> tools) {
+ ResourceDescriptor.Builder builder =
+
ResourceDescriptor.Builder.newBuilder(AnthropicChatModelSetup.class.getName())
+ .addInitialArgument("model", model)
+ .addInitialArgument("temperature", temperature)
+ .addInitialArgument("max_tokens", maxTokens);
+
+ if (additionalArguments != null && !additionalArguments.isEmpty()) {
+ builder.addInitialArgument("additional_kwargs",
additionalArguments);
+ }
+ if (tools != null && !tools.isEmpty()) {
+ builder.addInitialArgument("tools", tools);
+ }
+
+ return builder.build();
+ }
+}
diff --git a/integrations/chat-models/pom.xml b/integrations/chat-models/pom.xml
index 4a09cc6..cd3a920 100644
--- a/integrations/chat-models/pom.xml
+++ b/integrations/chat-models/pom.xml
@@ -31,6 +31,7 @@ under the License.
<packaging>pom</packaging>
<modules>
+ <module>anthropic</module>
<module>azureai</module>
<module>ollama</module>
<module>openai</module>
diff --git a/integrations/pom.xml b/integrations/pom.xml
index dd21927..1109f86 100644
--- a/integrations/pom.xml
+++ b/integrations/pom.xml
@@ -33,6 +33,7 @@ under the License.
<properties>
<ollama4j.version>1.1.5</ollama4j.version>
<openai.version>4.8.0</openai.version>
+ <anthropic.version>2.11.1</anthropic.version>
</properties>
<modules>