This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-agents.git


The following commit(s) were added to refs/heads/main by this push:
     new ef22016  [Feature][Java] Add Anthropic chat model integration (#369)
ef22016 is described below

commit ef2201699427c98c7d560ec1d1368cf2c8286f2d
Author: Xiang Li <[email protected]>
AuthorDate: Sun Dec 7 18:19:16 2025 -0800

    [Feature][Java] Add Anthropic chat model integration (#369)
---
 .../pom.xml                                        |   5 +
 .../test/ChatModelIntegrationAgent.java            |  17 +
 .../integration/test/ChatModelIntegrationTest.java |   2 +-
 integrations/{ => chat-models/anthropic}/pom.xml   |  34 +-
 .../anthropic/AnthropicChatModelConnection.java    | 509 +++++++++++++++++++++
 .../anthropic/AnthropicChatModelSetup.java         | 175 +++++++
 integrations/chat-models/pom.xml                   |   1 +
 integrations/pom.xml                               |   1 +
 8 files changed, 731 insertions(+), 13 deletions(-)

diff --git a/e2e-test/flink-agents-end-to-end-tests-integration/pom.xml 
b/e2e-test/flink-agents-end-to-end-tests-integration/pom.xml
index 65c7d12..1a57cf0 100644
--- a/e2e-test/flink-agents-end-to-end-tests-integration/pom.xml
+++ b/e2e-test/flink-agents-end-to-end-tests-integration/pom.xml
@@ -59,6 +59,11 @@ under the License.
             <artifactId>flink-clients</artifactId>
             <version>${flink.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            
<artifactId>flink-agents-integrations-chat-models-anthropic</artifactId>
+            <version>${project.version}</version>
+        </dependency>
         <dependency>
             <groupId>org.apache.flink</groupId>
             
<artifactId>flink-agents-integrations-chat-models-azureai</artifactId>
diff --git 
a/e2e-test/flink-agents-end-to-end-tests-integration/src/test/java/org/apache/flink/agents/integration/test/ChatModelIntegrationAgent.java
 
b/e2e-test/flink-agents-end-to-end-tests-integration/src/test/java/org/apache/flink/agents/integration/test/ChatModelIntegrationAgent.java
index 9056f99..6ce40a5 100644
--- 
a/e2e-test/flink-agents-end-to-end-tests-integration/src/test/java/org/apache/flink/agents/integration/test/ChatModelIntegrationAgent.java
+++ 
b/e2e-test/flink-agents-end-to-end-tests-integration/src/test/java/org/apache/flink/agents/integration/test/ChatModelIntegrationAgent.java
@@ -33,6 +33,8 @@ import org.apache.flink.agents.api.event.ChatRequestEvent;
 import org.apache.flink.agents.api.event.ChatResponseEvent;
 import org.apache.flink.agents.api.resource.ResourceDescriptor;
 import org.apache.flink.agents.api.resource.ResourceType;
+import 
org.apache.flink.agents.integrations.chatmodels.anthropic.AnthropicChatModelConnection;
+import 
org.apache.flink.agents.integrations.chatmodels.anthropic.AnthropicChatModelSetup;
 import 
org.apache.flink.agents.integrations.chatmodels.azureai.AzureAIChatModelConnection;
 import 
org.apache.flink.agents.integrations.chatmodels.azureai.AzureAIChatModelSetup;
 import 
org.apache.flink.agents.integrations.chatmodels.ollama.OllamaChatModelConnection;
@@ -87,6 +89,13 @@ public class ChatModelIntegrationAgent extends Agent {
             return 
ResourceDescriptor.Builder.newBuilder(OpenAIChatModelConnection.class.getName())
                     .addInitialArgument("api_key", apiKey)
                     .build();
+        } else if (provider.equals("ANTHROPIC")) {
+            String apiKey = System.getenv().get("ANTHROPIC_API_KEY");
+            return ResourceDescriptor.Builder.newBuilder(
+                            AnthropicChatModelConnection.class.getName())
+                    .addInitialArgument("api_key", apiKey)
+                    .addInitialArgument("timeout", 240)
+                    .build();
         } else {
             throw new RuntimeException(String.format("Unknown model provider 
%s", provider));
         }
@@ -112,6 +121,14 @@ public class ChatModelIntegrationAgent extends Agent {
                             "tools",
                             List.of("calculateBMI", "convertTemperature", 
"createRandomNumber"))
                     .build();
+        } else if (provider.equals("ANTHROPIC")) {
+            return 
ResourceDescriptor.Builder.newBuilder(AnthropicChatModelSetup.class.getName())
+                    .addInitialArgument("connection", "chatModelConnection")
+                    .addInitialArgument("model", "claude-sonnet-4-20250514")
+                    .addInitialArgument(
+                            "tools",
+                            List.of("calculateBMI", "convertTemperature", 
"createRandomNumber"))
+                    .build();
         } else if (provider.equals("OPENAI")) {
             return 
ResourceDescriptor.Builder.newBuilder(OpenAIChatModelSetup.class.getName())
                     .addInitialArgument("connection", "chatModelConnection")
diff --git 
a/e2e-test/flink-agents-end-to-end-tests-integration/src/test/java/org/apache/flink/agents/integration/test/ChatModelIntegrationTest.java
 
b/e2e-test/flink-agents-end-to-end-tests-integration/src/test/java/org/apache/flink/agents/integration/test/ChatModelIntegrationTest.java
index d1387b2..af2bf8c 100644
--- 
a/e2e-test/flink-agents-end-to-end-tests-integration/src/test/java/org/apache/flink/agents/integration/test/ChatModelIntegrationTest.java
+++ 
b/e2e-test/flink-agents-end-to-end-tests-integration/src/test/java/org/apache/flink/agents/integration/test/ChatModelIntegrationTest.java
@@ -52,7 +52,7 @@ public class ChatModelIntegrationTest extends 
OllamaPreparationUtils {
     }
 
     @ParameterizedTest()
-    @ValueSource(strings = {"OLLAMA", "AZURE", "OPENAI"})
+    @ValueSource(strings = {"ANTHROPIC", "AZURE", "OLLAMA", "OPENAI"})
     public void testChatModeIntegration(String provider) throws Exception {
         Assumptions.assumeTrue(
                 (OLLAMA.equals(provider) && ollamaReady)
diff --git a/integrations/pom.xml b/integrations/chat-models/anthropic/pom.xml
similarity index 56%
copy from integrations/pom.xml
copy to integrations/chat-models/anthropic/pom.xml
index dd21927..e59795f 100644
--- a/integrations/pom.xml
+++ b/integrations/chat-models/anthropic/pom.xml
@@ -22,22 +22,32 @@ under the License.
 
     <parent>
         <groupId>org.apache.flink</groupId>
-        <artifactId>flink-agents</artifactId>
+        <artifactId>flink-agents-integrations-chat-models</artifactId>
         <version>0.2-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
     </parent>
 
-    <artifactId>flink-agents-integrations</artifactId>
-    <name>Flink Agents : Integrations:</name>
-    <packaging>pom</packaging>
+    <artifactId>flink-agents-integrations-chat-models-anthropic</artifactId>
+    <name>Flink Agents : Integrations: Chat Models: Anthropic</name>
+    <packaging>jar</packaging>
 
-    <properties>
-        <ollama4j.version>1.1.5</ollama4j.version>
-        <openai.version>4.8.0</openai.version>
-    </properties>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-agents-api</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-agents-plan</artifactId>
+            <version>${project.version}</version>
+        </dependency>
 
-    <modules>
-        <module>chat-models</module>
-        <module>embedding-models</module>
-    </modules>
+        <dependency>
+            <groupId>com.anthropic</groupId>
+            <artifactId>anthropic-java</artifactId>
+            <version>${anthropic.version}</version>
+        </dependency>
+    </dependencies>
 
 </project>
diff --git 
a/integrations/chat-models/anthropic/src/main/java/org/apache/flink/agents/integrations/chatmodels/anthropic/AnthropicChatModelConnection.java
 
b/integrations/chat-models/anthropic/src/main/java/org/apache/flink/agents/integrations/chatmodels/anthropic/AnthropicChatModelConnection.java
new file mode 100644
index 0000000..ab30c97
--- /dev/null
+++ 
b/integrations/chat-models/anthropic/src/main/java/org/apache/flink/agents/integrations/chatmodels/anthropic/AnthropicChatModelConnection.java
@@ -0,0 +1,509 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.integrations.chatmodels.anthropic;
+
+import com.anthropic.client.AnthropicClient;
+import com.anthropic.client.okhttp.AnthropicOkHttpClient;
+import com.anthropic.core.JsonValue;
+import com.anthropic.models.messages.ContentBlock;
+import com.anthropic.models.messages.ContentBlockParam;
+import com.anthropic.models.messages.Message;
+import com.anthropic.models.messages.MessageCreateParams;
+import com.anthropic.models.messages.MessageParam;
+import com.anthropic.models.messages.Model;
+import com.anthropic.models.messages.TextBlockParam;
+import com.anthropic.models.messages.Tool;
+import com.anthropic.models.messages.ToolResultBlockParam;
+import com.anthropic.models.messages.ToolUseBlockParam;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.agents.api.chat.messages.ChatMessage;
+import org.apache.flink.agents.api.chat.messages.MessageRole;
+import org.apache.flink.agents.api.chat.model.BaseChatModelConnection;
+import org.apache.flink.agents.api.resource.Resource;
+import org.apache.flink.agents.api.resource.ResourceDescriptor;
+import org.apache.flink.agents.api.resource.ResourceType;
+import org.apache.flink.agents.api.tools.ToolMetadata;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+
+/**
+ * A chat model integration for the Anthropic Chat service using the official 
Java SDK.
+ *
+ * <p>Supported connection parameters:
+ *
+ * <ul>
+ *   <li><b>api_key</b> (required): Anthropic API key
+ *   <li><b>timeout</b> (optional): Timeout in seconds for API requests
+ *   <li><b>max_retries</b> (optional): Maximum number of retry attempts 
(default: 2)
+ * </ul>
+ *
+ * <p>Example usage:
+ *
+ * <pre>{@code
+ * public class MyAgent extends Agent {
+ *   @ChatModelConnection
+ *   public static ResourceDesc anthropic() {
+ *     return 
ResourceDescriptor.Builder.newBuilder(AnthropicChatModelConnection.class.getName())
+ *             .addInitialArgument("api_key", 
System.getenv("ANTHROPIC_API_KEY"))
+ *             .addInitialArgument("timeout", 120)
+ *             .addInitialArgument("max_retries", 3)
+ *             .build();
+ *   }
+ * }
+ * }</pre>
+ */
+public class AnthropicChatModelConnection extends BaseChatModelConnection {
+
+    private static final TypeReference<Map<String, Object>> MAP_TYPE = new 
TypeReference<>() {};
+
+    private final ObjectMapper mapper = new ObjectMapper();
+    private final AnthropicClient client;
+    private final String defaultModel;
+
+    public AnthropicChatModelConnection(
+            ResourceDescriptor descriptor, BiFunction<String, ResourceType, 
Resource> getResource) {
+        super(descriptor, getResource);
+
+        String apiKey = descriptor.getArgument("api_key");
+        if (apiKey == null || apiKey.isBlank()) {
+            throw new IllegalArgumentException("api_key should not be null or 
empty.");
+        }
+
+        AnthropicOkHttpClient.Builder builder = 
AnthropicOkHttpClient.builder().apiKey(apiKey);
+
+        Integer timeoutSeconds = descriptor.getArgument("timeout");
+        if (timeoutSeconds != null && timeoutSeconds > 0) {
+            builder.timeout(Duration.ofSeconds(timeoutSeconds));
+        }
+
+        Integer maxRetries = descriptor.getArgument("max_retries");
+        if (maxRetries != null && maxRetries >= 0) {
+            builder.maxRetries(maxRetries);
+        }
+
+        this.defaultModel = descriptor.getArgument("model");
+        this.client = builder.build();
+    }
+
+    @Override
+    public ChatMessage chat(
+            List<ChatMessage> messages,
+            List<org.apache.flink.agents.api.tools.Tool> tools,
+            Map<String, Object> arguments) {
+        try {
+            // Check if JSON prefill is requested before building request 
(arguments may be
+            // modified).
+            boolean jsonPrefillRequested =
+                    arguments != null && 
Boolean.TRUE.equals(arguments.get("json_prefill"));
+            // JSON prefill is automatically disabled when tools are passed in 
the request,
+            // because it interferes with native tool calling.
+            boolean hasToolsInRequest = tools != null && !tools.isEmpty();
+            boolean jsonPrefillApplied = jsonPrefillRequested && 
!hasToolsInRequest;
+
+            MessageCreateParams params = buildRequest(messages, tools, 
arguments);
+            Message response = client.messages().create(params);
+            return convertResponse(response, jsonPrefillApplied);
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to call Anthropic messages 
API.", e);
+        }
+    }
+
+    private MessageCreateParams buildRequest(
+            List<ChatMessage> messages,
+            List<org.apache.flink.agents.api.tools.Tool> tools,
+            Map<String, Object> rawArguments) {
+        Map<String, Object> arguments =
+                rawArguments != null ? new HashMap<>(rawArguments) : new 
HashMap<>();
+
+        Object modelObj = arguments.remove("model");
+        String modelName = modelObj != null ? modelObj.toString() : 
this.defaultModel;
+        if (modelName == null || modelName.isBlank()) {
+            modelName = this.defaultModel;
+        }
+
+        List<TextBlockParam> systemBlocks = extractSystemMessages(messages);
+
+        List<MessageParam> anthropicMessages =
+                messages.stream()
+                        .filter(m -> m.getRole() != MessageRole.SYSTEM)
+                        .map(this::convertToAnthropicMessage)
+                        .collect(Collectors.toList());
+
+        MessageCreateParams.Builder builder =
+                MessageCreateParams.builder()
+                        .model(Model.of(modelName))
+                        .messages(anthropicMessages);
+
+        if (!systemBlocks.isEmpty()) {
+            builder.systemOfTextBlockParams(systemBlocks);
+        }
+
+        // Handle strict tools - enables structured outputs for tool use
+        Object strictTools = arguments.remove("strict_tools");
+        boolean strictToolsEnabled = Boolean.TRUE.equals(strictTools);
+
+        if (tools != null && !tools.isEmpty()) {
+            for (Tool tool : convertTools(tools, strictToolsEnabled)) {
+                builder.addTool(tool);
+            }
+        }
+
+        // Add beta header for strict tool use
+        // 
https://platform.claude.com/docs/en/build-with-claude/structured-outputs#strict-tool-use
+        if (strictToolsEnabled) {
+            builder.putAdditionalHeader("anthropic-beta", 
"structured-outputs-2025-11-13");
+        }
+
+        Object maxTokens = arguments.remove("max_tokens");
+        if (maxTokens instanceof Number) {
+            builder.maxTokens(((Number) maxTokens).longValue());
+        }
+
+        Object temperature = arguments.remove("temperature");
+        if (temperature instanceof Number) {
+            builder.temperature(((Number) temperature).doubleValue());
+        }
+
+        @SuppressWarnings("unchecked")
+        Map<String, Object> additionalKwargs =
+                (Map<String, Object>) arguments.remove("additional_kwargs");
+        if (additionalKwargs != null) {
+            applyAdditionalKwargs(builder, additionalKwargs);
+        }
+
+        // Handle JSON prefill - append a prefilled assistant message with "{" 
to enforce JSON
+        // output. Note: JSON prefill is incompatible with tool use as it 
forces the model to output
+        // JSON text instead of using native tool_use content blocks. 
Automatically disable
+        // json_prefill when tools are actually passed in the request.
+        Object jsonPrefill = arguments.remove("json_prefill");
+        boolean hasToolsInRequest = tools != null && !tools.isEmpty();
+        if (Boolean.TRUE.equals(jsonPrefill) && !hasToolsInRequest) {
+            anthropicMessages.add(
+                    
MessageParam.builder().role(MessageParam.Role.ASSISTANT).content("{").build());
+            builder.messages(anthropicMessages);
+        }
+
+        return builder.build();
+    }
+
+    private List<TextBlockParam> extractSystemMessages(List<ChatMessage> 
messages) {
+        return messages.stream()
+                .filter(m -> m.getRole() == MessageRole.SYSTEM)
+                .map(m -> 
TextBlockParam.builder().text(m.getContent()).build())
+                .collect(Collectors.toList());
+    }
+
+    private MessageParam convertToAnthropicMessage(ChatMessage message) {
+        MessageRole role = message.getRole();
+        String content = Optional.ofNullable(message.getContent()).orElse("");
+
+        switch (role) {
+            case USER:
+                return 
MessageParam.builder().role(MessageParam.Role.USER).content(content).build();
+
+            case ASSISTANT:
+                List<Map<String, Object>> toolCalls = message.getToolCalls();
+                if (toolCalls != null && !toolCalls.isEmpty()) {
+                    List<ContentBlockParam> contentBlocks = new ArrayList<>();
+                    if (!content.isEmpty()) {
+                        contentBlocks.add(
+                                ContentBlockParam.ofText(
+                                        
TextBlockParam.builder().text(content).build()));
+                    }
+                    contentBlocks.addAll(convertToolCallsToToolUse(toolCalls));
+                    return MessageParam.builder()
+                            .role(MessageParam.Role.ASSISTANT)
+                            .contentOfBlockParams(contentBlocks)
+                            .build();
+                } else {
+                    return MessageParam.builder()
+                            .role(MessageParam.Role.ASSISTANT)
+                            .content(content)
+                            .build();
+                }
+
+            case TOOL:
+                Object toolCallId = message.getExtraArgs().get("externalId");
+                if (toolCallId == null) {
+                    throw new IllegalArgumentException(
+                            "Tool message must have an externalId in 
extraArgs.");
+                }
+                ToolResultBlockParam toolResult =
+                        ToolResultBlockParam.builder()
+                                .toolUseId(toolCallId.toString())
+                                .content(content)
+                                .build();
+                return MessageParam.builder()
+                        .role(MessageParam.Role.USER)
+                        
.contentOfBlockParams(List.of(ContentBlockParam.ofToolResult(toolResult)))
+                        .build();
+
+            default:
+                throw new IllegalArgumentException("Unsupported role: " + 
role);
+        }
+    }
+
+    private List<ContentBlockParam> convertToolCallsToToolUse(List<Map<String, 
Object>> toolCalls) {
+        List<ContentBlockParam> blocks = new ArrayList<>();
+        for (Map<String, Object> call : toolCalls) {
+            Object type = call.getOrDefault("type", "function");
+            if (!"function".equals(String.valueOf(type))) {
+                continue;
+            }
+
+            Map<String, Object> functionPayload = toMap(call.get("function"));
+            String functionName = String.valueOf(functionPayload.get("name"));
+            Object arguments = functionPayload.get("arguments");
+            Map<String, Object> inputMap = toMap(arguments);
+
+            Object originalIdObj = call.get("original_id");
+            if (originalIdObj == null) {
+                throw new IllegalArgumentException(
+                        "Tool call must have an original_id for Anthropic.");
+            }
+
+            ToolUseBlockParam toolUse =
+                    ToolUseBlockParam.builder()
+                            .id(originalIdObj.toString())
+                            .name(functionName)
+                            .input(toJsonValue(inputMap))
+                            .build();
+
+            blocks.add(ContentBlockParam.ofToolUse(toolUse));
+        }
+        return blocks;
+    }
+
+    private List<Tool> convertTools(
+            List<org.apache.flink.agents.api.tools.Tool> tools, boolean 
strictToolsEnabled) {
+        List<Tool> anthropicTools = new ArrayList<>(tools.size());
+        for (org.apache.flink.agents.api.tools.Tool tool : tools) {
+            ToolMetadata metadata = tool.getMetadata();
+            Tool.Builder toolBuilder =
+                    
Tool.builder().name(metadata.getName()).description(metadata.getDescription());
+
+            String schema = metadata.getInputSchema();
+            if (schema != null && !schema.isBlank()) {
+                toolBuilder.inputSchema(parseToolInputSchema(schema));
+            }
+
+            if (strictToolsEnabled) {
+                toolBuilder.putAdditionalProperty("strict", 
JsonValue.from(true));
+            }
+
+            anthropicTools.add(toolBuilder.build());
+        }
+        return anthropicTools;
+    }
+
+    private Tool.InputSchema parseToolInputSchema(String schemaJson) {
+        try {
+            JsonNode root = mapper.readTree(schemaJson);
+            if (root == null || !root.isObject()) {
+                return Tool.InputSchema.builder().build();
+            }
+
+            Tool.InputSchema.Builder builder = Tool.InputSchema.builder();
+            root.fields()
+                    .forEachRemaining(
+                            entry ->
+                                    builder.putAdditionalProperty(
+                                            entry.getKey(),
+                                            
JsonValue.fromJsonNode(entry.getValue())));
+
+            return builder.build();
+        } catch (JsonProcessingException e) {
+            throw new RuntimeException("Failed to parse tool schema JSON.", e);
+        }
+    }
+
+    private ChatMessage convertResponse(Message response, boolean 
jsonPrefillApplied) {
+        List<ContentBlock> contentBlocks = response.content();
+        if (contentBlocks.isEmpty()) {
+            throw new IllegalStateException("Anthropic response did not 
contain any content.");
+        }
+
+        StringBuilder textContent = new StringBuilder();
+        // If JSON prefill was used, prepend "{" since the response only 
contains the continuation
+        if (jsonPrefillApplied) {
+            textContent.append("{");
+        }
+        List<Map<String, Object>> toolCalls = new ArrayList<>();
+
+        for (ContentBlock block : contentBlocks) {
+            if (block.isText()) {
+                block.text()
+                        .ifPresent(
+                                textBlock -> {
+                                    textContent.append(textBlock.text());
+                                });
+            } else if (block.isToolUse()) {
+                block.toolUse()
+                        .ifPresent(
+                                toolUse -> {
+                                    String toolUseId = toolUse.id();
+                                    Map<String, Object> toolCall = new 
LinkedHashMap<>();
+                                    toolCall.put("id", toolUseId);
+                                    toolCall.put("type", "function");
+
+                                    Map<String, Object> functionMap = new 
LinkedHashMap<>();
+                                    functionMap.put("name", toolUse.name());
+                                    JsonValue inputValue = toolUse._input();
+                                    Map<String, Object> inputMap = 
jsonValueToMap(inputValue);
+                                    functionMap.put("arguments", inputMap);
+                                    toolCall.put("function", functionMap);
+                                    toolCall.put("original_id", toolUseId);
+
+                                    toolCalls.add(toolCall);
+                                });
+            }
+        }
+
+        String finalText = textContent.toString();
+
+        // If the response has no tool calls, try to extract JSON from 
markdown code blocks.
+        if (toolCalls.isEmpty()) {
+            finalText = extractJsonFromMarkdown(finalText);
+        }
+
+        ChatMessage chatMessage = ChatMessage.assistant(finalText);
+        if (!toolCalls.isEmpty()) {
+            chatMessage.setToolCalls(toolCalls);
+        }
+
+        return chatMessage;
+    }
+
+    /**
+     * Extracts JSON content from a string that may contain markdown code 
blocks.
+     *
+     * <p>Claude often wraps JSON responses in markdown code blocks like 
{@code ```json ... ```},
+     * especially when tools are configured (since json_prefill is disabled). 
This method extracts
+     * the JSON content from such responses. If no code block is found, the 
original content is
+     * returned unchanged.
+     *
+     * @param content The response content that may contain markdown-wrapped 
JSON
+     * @return The extracted JSON string, or the original content if no code 
block is found
+     */
+    private String extractJsonFromMarkdown(String content) {
+        if (content == null) {
+            return null;
+        }
+
+        String trimmed = content.trim();
+
+        // Try to find JSON in markdown code block (```json ... ``` or ``` ... 
```)
+        int jsonBlockStart = trimmed.indexOf("```json");
+        int genericBlockStart = trimmed.indexOf("```");
+
+        int contentStart;
+
+        if (jsonBlockStart != -1) {
+            contentStart = jsonBlockStart + 7; // length of "```json"
+        } else if (genericBlockStart != -1) {
+            contentStart = genericBlockStart + 3; // length of "```"
+        } else {
+            return content;
+        }
+
+        // Find the closing ```
+        int blockEnd = trimmed.indexOf("```", contentStart);
+        if (blockEnd == -1) {
+            return content;
+        }
+
+        // Extract content between the markers
+        return trimmed.substring(contentStart, blockEnd).trim();
+    }
+
+    private void applyAdditionalKwargs(
+            MessageCreateParams.Builder builder, Map<String, Object> kwargs) {
+        for (Map.Entry<String, Object> entry : kwargs.entrySet()) {
+            String key = entry.getKey();
+            Object value = entry.getValue();
+
+            switch (key) {
+                case "top_k":
+                    if (value instanceof Number) {
+                        builder.topK(((Number) value).longValue());
+                    }
+                    break;
+                case "top_p":
+                    if (value instanceof Number) {
+                        builder.topP(((Number) value).doubleValue());
+                    }
+                    break;
+                case "stop_sequences":
+                    if (value instanceof List) {
+                        @SuppressWarnings("unchecked")
+                        List<String> stopSequences = (List<String>) value;
+                        builder.stopSequences(stopSequences);
+                    }
+                    break;
+                default:
+                    builder.putAdditionalBodyProperty(key, toJsonValue(value));
+                    break;
+            }
+        }
+    }
+
+    private Map<String, Object> toMap(Object value) {
+        if (value instanceof Map) {
+            @SuppressWarnings("unchecked")
+            Map<String, Object> casted = (Map<String, Object>) value;
+            return new LinkedHashMap<>(casted);
+        }
+        if (value == null) {
+            return new LinkedHashMap<>();
+        }
+        return mapper.convertValue(value, MAP_TYPE);
+    }
+
+    private JsonValue toJsonValue(Object value) {
+        if (value instanceof JsonValue) {
+            return (JsonValue) value;
+        }
+        if (value instanceof String
+                || value instanceof Number
+                || value instanceof Boolean
+                || value == null) {
+            return JsonValue.from(value);
+        }
+        return JsonValue.fromJsonNode(mapper.valueToTree(value));
+    }
+
+    private Map<String, Object> jsonValueToMap(JsonValue jsonValue) {
+        try {
+            String jsonString = mapper.writeValueAsString(jsonValue);
+            return mapper.readValue(jsonString, MAP_TYPE);
+        } catch (JsonProcessingException e) {
+            throw new RuntimeException("Failed to convert JsonValue to Map.", 
e);
+        }
+    }
+}
diff --git 
a/integrations/chat-models/anthropic/src/main/java/org/apache/flink/agents/integrations/chatmodels/anthropic/AnthropicChatModelSetup.java
 
b/integrations/chat-models/anthropic/src/main/java/org/apache/flink/agents/integrations/chatmodels/anthropic/AnthropicChatModelSetup.java
new file mode 100644
index 0000000..145213c
--- /dev/null
+++ 
b/integrations/chat-models/anthropic/src/main/java/org/apache/flink/agents/integrations/chatmodels/anthropic/AnthropicChatModelSetup.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.agents.integrations.chatmodels.anthropic;
+
+import org.apache.flink.agents.api.chat.model.BaseChatModelSetup;
+import org.apache.flink.agents.api.resource.Resource;
+import org.apache.flink.agents.api.resource.ResourceDescriptor;
+import org.apache.flink.agents.api.resource.ResourceType;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.BiFunction;
+
+/**
+ * Chat model setup for the Anthropic Messages API.
+ *
+ * <p>Responsible for providing per-chat configuration such as model, 
temperature, max_tokens, tool
+ * bindings, and additional Anthropic parameters. The setup delegates 
execution to {@link
+ * AnthropicChatModelConnection}.
+ *
+ * <p>Supported parameters:
+ *
+ * <ul>
+ *   <li><b>connection</b> (required): Name of the 
AnthropicChatModelConnection resource
+ *   <li><b>model</b> (optional): Model name (default: 
claude-sonnet-4-20250514)
+ *   <li><b>temperature</b> (optional): Sampling temperature 0.0-1.0 (default: 
0.1)
+ *   <li><b>max_tokens</b> (optional): Maximum tokens in response (default: 
1024)
+ *   <li><b>json_prefill</b> (optional): When true, prefills assistant 
response with "{" to enforce
+ *       JSON output. Automatically disabled when tools are passed. (default: 
true)
+ *   <li><b>strict_tools</b> (optional): When true, tool calls adhere strictly 
to the JSON schema.
+ *       (default: false)
+ *   <li><b>tools</b> (optional): List of tool names available for the model 
to use
+ *   <li><b>additional_kwargs</b> (optional): Additional parameters (top_k, 
top_p, stop_sequences)
+ * </ul>
+ *
+ * <p>Example usage:
+ *
+ * <pre>{@code
+ * public class MyAgent extends Agent {
+ *   @ChatModelSetup
+ *   public static ResourceDesc anthropic() {
+ *     return 
ResourceDescriptor.Builder.newBuilder(AnthropicChatModelSetup.class.getName())
+ *             .addInitialArgument("connection", "myAnthropicConnection")
+ *             .addInitialArgument("model", "claude-sonnet-4-20250514")
+ *             .addInitialArgument("temperature", 0.3d)
+ *             .addInitialArgument("max_tokens", 2048)
+ *             .addInitialArgument("strict_tools", true)
+ *             .addInitialArgument("tools", List.of("convertTemperature", 
"calculateBMI"))
+ *             .addInitialArgument(
+ *                     "additional_kwargs",
+ *                     Map.of("top_k", 40, "top_p", 0.9))
+ *             .build();
+ *   }
+ * }
+ * }</pre>
+ */
+public class AnthropicChatModelSetup extends BaseChatModelSetup {
+
+    private static final String DEFAULT_MODEL = "claude-sonnet-4-20250514";
+    private static final double DEFAULT_TEMPERATURE = 0.1d;
+    private static final long DEFAULT_MAX_TOKENS = 1024L;
+    private static final boolean DEFAULT_JSON_PREFILL = true;
+    private static final boolean DEFAULT_STRICT_TOOLS = false;
+
+    private final Double temperature;
+    private final Long maxTokens;
+    private final Boolean jsonPrefill;
+    private final Boolean strictTools;
+    private final Map<String, Object> additionalArguments;
+
+    public AnthropicChatModelSetup(
+            ResourceDescriptor descriptor, BiFunction<String, ResourceType, 
Resource> getResource) {
+        super(descriptor, getResource);
+        this.temperature =
+                
Optional.ofNullable(descriptor.<Number>getArgument("temperature"))
+                        .map(Number::doubleValue)
+                        .orElse(DEFAULT_TEMPERATURE);
+        if (this.temperature < 0.0 || this.temperature > 1.0) {
+            throw new IllegalArgumentException("temperature must be between 
0.0 and 1.0");
+        }
+
+        this.maxTokens =
+                
Optional.ofNullable(descriptor.<Number>getArgument("max_tokens"))
+                        .map(Number::longValue)
+                        .orElse(DEFAULT_MAX_TOKENS);
+        if (this.maxTokens <= 0) {
+            throw new IllegalArgumentException("max_tokens must be greater 
than 0");
+        }
+
+        this.jsonPrefill =
+                
Optional.ofNullable(descriptor.<Boolean>getArgument("json_prefill"))
+                        .orElse(DEFAULT_JSON_PREFILL);
+
+        this.strictTools =
+                
Optional.ofNullable(descriptor.<Boolean>getArgument("strict_tools"))
+                        .orElse(DEFAULT_STRICT_TOOLS);
+
+        this.additionalArguments =
+                Optional.ofNullable(
+                                descriptor.<Map<String, 
Object>>getArgument("additional_kwargs"))
+                        .map(HashMap::new)
+                        .orElseGet(HashMap::new);
+
+        if (this.model == null || this.model.isBlank()) {
+            this.model = DEFAULT_MODEL;
+        }
+    }
+
+    public AnthropicChatModelSetup(
+            String model,
+            double temperature,
+            long maxTokens,
+            Map<String, Object> additionalArguments,
+            List<String> tools,
+            BiFunction<String, ResourceType, Resource> getResource) {
+        this(
+                createDescriptor(model, temperature, maxTokens, 
additionalArguments, tools),
+                getResource);
+    }
+
+    @Override
+    public Map<String, Object> getParameters() {
+        Map<String, Object> parameters = new HashMap<>();
+        if (model != null) {
+            parameters.put("model", model);
+        }
+        parameters.put("temperature", temperature);
+        parameters.put("max_tokens", maxTokens);
+        parameters.put("json_prefill", jsonPrefill);
+        parameters.put("strict_tools", strictTools);
+        if (additionalArguments != null && !additionalArguments.isEmpty()) {
+            parameters.put("additional_kwargs", additionalArguments);
+        }
+        return parameters;
+    }
+
+    private static ResourceDescriptor createDescriptor(
+            String model,
+            double temperature,
+            long maxTokens,
+            Map<String, Object> additionalArguments,
+            List<String> tools) {
+        ResourceDescriptor.Builder builder =
+                
ResourceDescriptor.Builder.newBuilder(AnthropicChatModelSetup.class.getName())
+                        .addInitialArgument("model", model)
+                        .addInitialArgument("temperature", temperature)
+                        .addInitialArgument("max_tokens", maxTokens);
+
+        if (additionalArguments != null && !additionalArguments.isEmpty()) {
+            builder.addInitialArgument("additional_kwargs", 
additionalArguments);
+        }
+        if (tools != null && !tools.isEmpty()) {
+            builder.addInitialArgument("tools", tools);
+        }
+
+        return builder.build();
+    }
+}
diff --git a/integrations/chat-models/pom.xml b/integrations/chat-models/pom.xml
index 4a09cc6..cd3a920 100644
--- a/integrations/chat-models/pom.xml
+++ b/integrations/chat-models/pom.xml
@@ -31,6 +31,7 @@ under the License.
     <packaging>pom</packaging>
 
     <modules>
+        <module>anthropic</module>
         <module>azureai</module>
         <module>ollama</module>
         <module>openai</module>
diff --git a/integrations/pom.xml b/integrations/pom.xml
index dd21927..1109f86 100644
--- a/integrations/pom.xml
+++ b/integrations/pom.xml
@@ -33,6 +33,7 @@ under the License.
     <properties>
         <ollama4j.version>1.1.5</ollama4j.version>
         <openai.version>4.8.0</openai.version>
+        <anthropic.version>2.11.1</anthropic.version>
     </properties>
 
     <modules>


Reply via email to