This is an automated email from the ASF dual-hosted git repository.

nic443 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix.git


The following commit(s) were added to refs/heads/master by this push:
     new fc5d763d5 feat(ai-proxy): add native Anthropic Messages API protocol 
support (#13181)
fc5d763d5 is described below

commit fc5d763d5dba4b2365054137b58f8d385a4bf644
Author: Nic <[email protected]>
AuthorDate: Thu Apr 9 10:16:26 2026 +0800

    feat(ai-proxy): add native Anthropic Messages API protocol support (#13181)
---
 apisix/plugins/ai-protocols/anthropic-messages.lua |  350 +++++
 .../anthropic-messages-to-openai-chat.lua          |  447 +++++++
 apisix/plugins/ai-protocols/converters/init.lua    |    3 +
 apisix/plugins/ai-protocols/init.lua               |    6 +-
 apisix/plugins/ai-protocols/openai-chat.lua        |   11 +-
 apisix/plugins/ai-protocols/openai-embeddings.lua  |    4 -
 apisix/plugins/ai-providers/anthropic.lua          |    1 +
 apisix/plugins/ai-providers/base.lua               |   11 +
 apisix/plugins/ai-proxy/base.lua                   |   11 +-
 t/APISIX.pm                                        |   22 +-
 t/plugin/ai-proxy-protocol-conversion.t            | 1384 ++++++++++++++++++++
 11 files changed, 2227 insertions(+), 23 deletions(-)

diff --git a/apisix/plugins/ai-protocols/anthropic-messages.lua 
b/apisix/plugins/ai-protocols/anthropic-messages.lua
new file mode 100644
index 000000000..8e5ccbe46
--- /dev/null
+++ b/apisix/plugins/ai-protocols/anthropic-messages.lua
@@ -0,0 +1,350 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+--- Anthropic Messages protocol adapter (client-side).
+-- Handles detection and response parsing for the Anthropic Messages API
+-- format. This adapter is for clients sending Anthropic-format requests.
+--
+-- Conversion logic (Anthropic↔OpenAI) lives in
+-- ai-protocols/converters/anthropic-messages-to-openai-chat.lua.
+
+local core = require("apisix.core")
+local string_sub = string.sub
+local uuid = require("resty.jit-uuid")
+local sse = require("apisix.plugins.ai-transport.sse")
+local table = table
+local type = type
+local ipairs = ipairs
+
+local _M = {}
+
+
+--- Detect whether the request matches the Anthropic Messages API format.
+-- Uses URI suffix (/v1/messages) and body (valid JSON table).
+function _M.matches(body, ctx)
+    local uri = ctx.var and ctx.var.uri
+    return uri and string_sub(uri, -12) == "/v1/messages" and type(body) == 
"table"
+end
+
+
+--- Check whether the request is a streaming request.
+function _M.is_streaming(body)
+    return body.stream == true
+end
+
+
+
+--- Parse a streaming SSE event in native Anthropic format.
+-- Used when the provider natively supports Anthropic protocol.
+function _M.parse_sse_event(event, ctx, state)
+    if event.type == "content_block_delta" then
+        local data, err = core.json.decode(event.data)
+        if not data then
+            core.log.warn("failed to decode SSE data: ", err)
+            return { type = "skip" }
+        end
+        if type(data.delta) == "table" and data.delta.type == "text_delta"
+                and type(data.delta.text) == "string" then
+            return {
+                type = "delta",
+                texts = { data.delta.text },
+            }
+        end
+        return { type = "skip" }
+
+    elseif event.type == "message_delta" then
+        local data, err = core.json.decode(event.data)
+        if not data then
+            core.log.warn("failed to decode message_delta: ", err)
+            return { type = "skip" }
+        end
+        if type(data.usage) == "table" then
+            return {
+                type = "usage",
+                usage = {
+                    prompt_tokens = data.usage.input_tokens or 0,
+                    completion_tokens = data.usage.output_tokens or 0,
+                    total_tokens = (data.usage.input_tokens or 0)
+                        + (data.usage.output_tokens or 0),
+                },
+                raw_usage = data.usage,
+            }
+        end
+        return { type = "skip" }
+
+    elseif event.type == "message_stop" then
+        return { type = "done" }
+
+    elseif event.type == "message_start" then
+        local data = core.json.decode(event.data)
+        if not data then
+            return { type = "skip" }
+        end
+        if type(data.message) == "table" and type(data.message.usage) == 
"table" then
+            local usage = data.message.usage
+            return {
+                type = "usage",
+                usage = {
+                    prompt_tokens = usage.input_tokens or 0,
+                    completion_tokens = usage.output_tokens or 0,
+                    total_tokens = (usage.input_tokens or 0) + 
(usage.output_tokens or 0),
+                },
+                raw_usage = usage,
+            }
+        end
+        return { type = "skip" }
+
+    elseif event.type == "error" then
+        local err_data = core.json.decode(event.data)
+        local err_type = err_data and err_data.error and err_data.error.type 
or "unknown"
+        local err_msg = err_data and err_data.error and err_data.error.message 
or "unknown"
+        core.log.warn("Anthropic SSE error: type=", err_type, ", message=", 
err_msg)
+        return { type = "done" }
+    end
+
+    return { type = "skip" }
+end
+
+
+--- Extract response text from a native Anthropic response body.
+function _M.extract_response_text(res_body)
+    if type(res_body) ~= "table" then
+        return nil
+    end
+    if type(res_body.content) == "table" then
+        local contents = {}
+        for _, block in ipairs(res_body.content) do
+            if type(block) == "table" and block.type == "text"
+                    and type(block.text) == "string" then
+                core.table.insert(contents, block.text)
+            end
+        end
+        if #contents > 0 then
+            return table.concat(contents, " ")
+        end
+    end
+    return nil
+end
+
+
+--- Build a non-streaming request from system prompt and user content.
+function _M.build_simple_request(system_prompt, user_content, opts)
+    local body = {
+        messages = {{role = "user", content = user_content}},
+        stream = false,
+        max_tokens = (opts and opts.max_tokens) or 4096,
+    }
+    if system_prompt then
+        body.system = system_prompt
+    end
+    if opts and opts.model then
+        body.model = opts.model
+    end
+    return body
+end
+
+
+function _M.extract_usage(res_body)
+    if type(res_body) ~= "table" or type(res_body.usage) ~= "table" then
+        return nil, nil
+    end
+    local raw = res_body.usage
+    local prompt = raw.input_tokens or 0
+    local completion = raw.output_tokens or 0
+    return {
+        prompt_tokens = prompt,
+        completion_tokens = completion,
+        total_tokens = prompt + completion,
+    }, raw
+end
+
+
+--- Extract all text content from a request body for moderation.
+function _M.extract_request_content(body)
+    local contents = {}
+    if type(body.messages) == "table" then
+        for _, message in ipairs(body.messages) do
+            if type(message) ~= "table" then
+                goto CONTINUE_MESSAGE
+            end
+            if type(message.content) == "string" then
+                core.table.insert(contents, message.content)
+            elseif type(message.content) == "table" then
+                for _, block in ipairs(message.content) do
+                    if type(block) == "table" and block.type == "text"
+                            and type(block.text) == "string" then
+                        core.table.insert(contents, block.text)
+                    end
+                end
+            end
+            ::CONTINUE_MESSAGE::
+        end
+    end
+    return contents
+end
+
+
+--- Get messages in canonical {role, content} format.
+-- Anthropic content blocks are flattened to plain text.
+function _M.get_messages(body)
+    local messages = {}
+    if type(body.system) == "string" then
+        core.table.insert(messages, {role = "system", content = body.system})
+    end
+    if type(body.messages) == "table" then
+        for _, message in ipairs(body.messages) do
+            local content = message.content
+            if type(content) == "string" then
+                core.table.insert(messages, {role = message.role, content = 
content})
+            elseif type(content) == "table" then
+                local texts = {}
+                for _, block in ipairs(content) do
+                    if type(block) == "table" and block.type == "text" then
+                        core.table.insert(texts, block.text)
+                    end
+                end
+                if #texts > 0 then
+                    core.table.insert(messages, {
+                        role = message.role,
+                        content = table.concat(texts, " "),
+                    })
+                end
+            end
+        end
+    end
+    return messages
+end
+
+
+--- Prepend messages to the request body.
+function _M.prepend_messages(body, msgs)
+    if not msgs or #msgs == 0 then return end
+    if not body.messages then
+        body.messages = {}
+    end
+    local new_messages = {}
+    for _, msg in ipairs(msgs) do
+        core.table.insert(new_messages, {role = msg.role, content = 
msg.content})
+    end
+    for _, msg in ipairs(body.messages) do
+        core.table.insert(new_messages, msg)
+    end
+    body.messages = new_messages
+end
+
+
+--- Append messages to the request body.
+function _M.append_messages(body, msgs)
+    if not msgs or #msgs == 0 then return end
+    if not body.messages then
+        body.messages = {}
+    end
+    for _, msg in ipairs(msgs) do
+        core.table.insert(body.messages, {role = msg.role, content = 
msg.content})
+    end
+end
+
+
+--- Get raw request content for logging.
+function _M.get_request_content(body)
+    return body.messages
+end
+-- opts: {text, model, usage, stream}
+function _M.build_deny_response(opts)
+    if opts.stream then
+        local message_start = {
+            type = "message_start",
+            message = {
+                id = uuid.generate_v4(),
+                type = "message",
+                role = "assistant",
+                model = opts.model,
+                content = {},
+                usage = opts.usage,
+            },
+        }
+        local content_block_start = {
+            type = "content_block_start",
+            index = 0,
+            content_block = { type = "text", text = "" },
+        }
+        local content_block_delta = {
+            type = "content_block_delta",
+            index = 0,
+            delta = { type = "text_delta", text = opts.text },
+        }
+        local content_block_stop = {
+            type = "content_block_stop",
+            index = 0,
+        }
+        local message_delta = {
+            type = "message_delta",
+            delta = { stop_reason = "end_turn" },
+            usage = opts.usage,
+        }
+        return sse.encode({ type = "message_start", data = 
core.json.encode(message_start) })
+            .. sse.encode({ type = "content_block_start",
+                            data = core.json.encode(content_block_start) })
+            .. sse.encode({ type = "content_block_delta",
+                            data = core.json.encode(content_block_delta) })
+            .. sse.encode({ type = "content_block_stop",
+                            data = core.json.encode(content_block_stop) })
+            .. sse.encode({ type = "message_delta",
+                            data = core.json.encode(message_delta) })
+            .. sse.encode({ type = "message_stop", data = "{}" })
+    else
+        return core.json.encode({
+            id = uuid.generate_v4(),
+            type = "message",
+            role = "assistant",
+            model = opts.model,
+            content = {{
+                type = "text",
+                text = opts.text,
+            }},
+            stop_reason = "end_turn",
+            usage = opts.usage,
+        })
+    end
+end
+
+
+--- Build an empty usage object.
+function _M.empty_usage()
+    return { input_tokens = 0, output_tokens = 0 }
+end
+
+
+--- Check if an SSE event is a data event (contains parseable content).
+function _M.is_data_event(event)
+    return event.type == "content_block_delta" or event.type == "message_delta"
+end
+
+
+--- Check if an SSE event is the terminal/done event.
+function _M.is_done_event(event)
+    return event.type == "message_stop"
+end
+
+
+--- Build a terminal SSE event string.
+function _M.build_done_event()
+    return sse.encode({ type = "message_stop", data = "{}" })
+end
+
+
+return _M
diff --git 
a/apisix/plugins/ai-protocols/converters/anthropic-messages-to-openai-chat.lua 
b/apisix/plugins/ai-protocols/converters/anthropic-messages-to-openai-chat.lua
new file mode 100644
index 000000000..518aba8d4
--- /dev/null
+++ 
b/apisix/plugins/ai-protocols/converters/anthropic-messages-to-openai-chat.lua
@@ -0,0 +1,447 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+--- Converter: Anthropic Messages → OpenAI Chat Completions.
+-- Converts client requests from Anthropic Messages API format to
+-- OpenAI Chat Completions format, and converts provider responses
+-- back from OpenAI to Anthropic format.
+--
+-- Converters work DOWNSTREAM of adapters: the target adapter (openai-chat)
+-- parses the provider's response, and this converter transforms the parsed
+-- result into the client's format (Anthropic Messages).
+
+local core = require("apisix.core")
+local table = table
+local type = type
+local ipairs = ipairs
+local tostring = tostring
+local setmetatable = setmetatable
+
+local _M = {
+    from = "anthropic-messages",
+    to = "openai-chat",
+}
+
+
+-- SSE event helpers
+local function make_sse_event(event_type, data)
+    return { type = event_type, data = core.json.encode(data) }
+end
+
+local function push_content_block_stop(events, idx)
+    table.insert(events, make_sse_event("content_block_stop", {
+        type = "content_block_stop",
+        index = idx,
+    }))
+end
+
+local function push_content_block_start(events, idx, block)
+    table.insert(events, make_sse_event("content_block_start", {
+        type  = "content_block_start",
+        index = idx,
+        content_block = block,
+    }))
+end
+
+local function push_content_block_delta(events, idx, delta)
+    table.insert(events, make_sse_event("content_block_delta", {
+        type  = "content_block_delta",
+        index = idx,
+        delta = delta,
+    }))
+end
+
+local openai_stop_reason_map = {
+    stop = "end_turn",
+    length = "max_tokens",
+    content_filter = "end_turn",
+    tool_calls = "tool_use",
+}
+
+
+--- Convert an incoming Anthropic request to OpenAI Chat format.
+function _M.convert_request(request_table, ctx)
+    if type(request_table) ~= "table" then
+        return nil, "request body must be a table"
+    end
+
+    if type(request_table.messages) ~= "table" or
+       #request_table.messages == 0 then
+        return nil, "missing messages"
+    end
+
+    local openai_body = core.table.clone(request_table)
+
+    -- 1. Handle System Prompt
+    local messages = {}
+    if request_table.system then
+        local system_content = ""
+        if type(request_table.system) == "string" then
+            system_content = request_table.system
+        elseif type(request_table.system) == "table" then
+            for _, block in ipairs(request_table.system) do
+                if type(block) == "table" and block.type == "text"
+                        and type(block.text) == "string" then
+                    system_content = system_content .. block.text
+                end
+            end
+        end
+
+        if system_content ~= "" then
+            table.insert(messages, {
+                role = "system",
+                content = system_content
+            })
+        end
+        openai_body.system = nil
+    end
+
+    -- 2. Convert Messages (including tool calls and results)
+    for i, msg in ipairs(request_table.messages) do
+        if type(msg) ~= "table" or type(msg.role) ~= "string" then
+            return nil, "invalid message at index " .. i
+        end
+        if type(msg.content) ~= "string" and type(msg.content) ~= "table" then
+            return nil, "invalid message content at index " .. i
+        end
+
+        local new_msg = {
+            role = msg.role,
+            content = ""
+        }
+        if type(msg.content) == "string" then
+            new_msg.content = msg.content
+        elseif type(msg.content) == "table" then
+            local tool_calls = {}
+            local tool_results = {}
+
+            for _, block in ipairs(msg.content) do
+                if type(block) ~= "table" then
+                    core.log.warn("unexpected non-table content block in 
Anthropic ",
+                                  "request, skipping: ", tostring(block))
+                    goto CONTINUE_BLOCK
+                end
+
+                if block.type == "text" and type(block.text) == "string" then
+                    new_msg.content = (new_msg.content or "") .. block.text
+                elseif block.type == "tool_use" then
+                    if type(block.id) == "string" and type(block.name) == 
"string" then
+                        table.insert(tool_calls, {
+                            id = block.id,
+                            type = "function",
+                            ["function"] = {
+                                name = block.name,
+                                arguments = core.json.encode(block.input or {})
+                            }
+                        })
+                    end
+                elseif block.type == "tool_result" then
+                    if type(block.tool_use_id) == "string" then
+                        table.insert(tool_results, {
+                            role = "tool",
+                            tool_call_id = block.tool_use_id,
+                            content = type(block.content) == "table"
+                                      and core.json.encode(block.content)
+                                      or tostring(block.content or "")
+                        })
+                    end
+                end
+
+                ::CONTINUE_BLOCK::
+            end
+
+            if #tool_calls > 0 then
+                new_msg.tool_calls = tool_calls
+                new_msg.content = new_msg.content ~= "" and new_msg.content or 
nil
+            end
+
+            if #tool_results > 0 then
+                if new_msg.content and new_msg.content ~= "" then
+                    table.insert(messages, { role = msg.role, content = 
new_msg.content })
+                end
+                for _, tr in ipairs(tool_results) do
+                    table.insert(messages, tr)
+                end
+                goto CONTINUE
+            end
+        end
+
+        table.insert(messages, new_msg)
+        ::CONTINUE::
+    end
+    openai_body.messages = messages
+
+    -- 3. Convert Tools Definition
+    if type(request_table.tools) == "table" then
+        local openai_tools = {}
+        for i, tool in ipairs(request_table.tools) do
+            if type(tool) ~= "table" or type(tool.name) ~= "string" or 
tool.name == "" then
+                return nil, "invalid tool definition at index " .. i
+            end
+            table.insert(openai_tools, {
+                type = "function",
+                ["function"] = {
+                    name = tool.name,
+                    description = tool.description,
+                    parameters = tool.input_schema
+                }
+            })
+        end
+        openai_body.tools = openai_tools
+    end
+
+    -- 4. Map Parameters
+    if openai_body.max_tokens then
+        openai_body.max_completion_tokens = openai_body.max_tokens
+    end
+
+    if openai_body.stop_sequences then
+        openai_body.stop = openai_body.stop_sequences
+        openai_body.stop_sequences = nil
+    end
+
+    return openai_body
+end
+
+
+--- Convert an OpenAI response back to Anthropic format.
+function _M.convert_response(res_body, ctx)
+    if type(res_body) ~= "table" then
+        return nil, "response body must be a table"
+    end
+
+    local choice = res_body.choices and res_body.choices[1]
+    if not choice then
+        return nil, "no choices in response"
+    end
+
+    local model = ctx.var.llm_model
+
+    local content = {}
+    local text = choice.message and choice.message.content
+    if type(text) == "string" and text ~= "" then
+        table.insert(content, { type = "text", text = text })
+    end
+
+    if choice.message and type(choice.message.tool_calls) == "table" then
+        for _, tc in ipairs(choice.message.tool_calls) do
+            local input = {}
+            if tc["function"] and type(tc["function"].arguments) == "string" 
then
+                local decoded, err = core.json.decode(tc["function"].arguments)
+                if decoded == nil then
+                    return nil, "invalid tool_call arguments: " .. (err or 
"decode error")
+                end
+                input = decoded
+            end
+            table.insert(content, {
+                type = "tool_use",
+                id = tc.id or "",
+                name = (tc["function"] and tc["function"].name) or "",
+                input = input
+            })
+        end
+    end
+
+    if #content == 0 then
+        content = {{ type = "text", text = "" }}
+    end
+
+    local anthropic_res = {
+        id = res_body.id,
+        type = "message",
+        role = "assistant",
+        model = model or res_body.model,
+        content = content,
+        stop_reason = openai_stop_reason_map[choice.finish_reason] or 
"end_turn",
+        usage = {
+            input_tokens = res_body.usage and res_body.usage.prompt_tokens or 
0,
+            output_tokens = res_body.usage and 
res_body.usage.completion_tokens or 0,
+        }
+    }
+
+    if res_body.usage and res_body.usage.prompt_tokens_details then
+        anthropic_res.usage.cache_read_input_tokens =
+            res_body.usage.prompt_tokens_details.cached_tokens or 0
+    end
+
+    return anthropic_res
+end
+
+
+--- Convert an OpenAI SSE chunk to Anthropic SSE events.
+-- state: table to maintain stream state (is_first, content_index, etc.)
+local function openai_to_anthropic_sse(openai_chunk, state)
+    if type(openai_chunk) ~= "table" then
+        return {}
+    end
+    if type(state) ~= "table" then
+        return {}
+    end
+    local events = {}
+    local choice = openai_chunk.choices and openai_chunk.choices[1]
+
+    -- If finish_reason was seen, we deferred message_delta+message_stop to 
allow
+    -- a trailing usage-only chunk to be merged in. Flush now.
+    if state.is_done then
+        if state.pending_stop then
+            local message_delta = state.pending_message_delta
+            if type(openai_chunk.usage) == "table" and not message_delta.usage 
then
+                message_delta.usage = {
+                    input_tokens  = openai_chunk.usage.prompt_tokens or 0,
+                    output_tokens = openai_chunk.usage.completion_tokens or 0,
+                }
+            end
+            table.insert(events, make_sse_event("message_delta", 
message_delta))
+            table.insert(events, make_sse_event("message_stop", { type = 
"message_stop" }))
+            state.pending_stop = false
+            state.pending_message_delta = nil
+        end
+        return events
+    end
+
+    -- 1. Initialize message on first chunk
+    if state.is_first then
+        local message = {
+            id = openai_chunk.id,
+            type = "message",
+            role = "assistant",
+            model = openai_chunk.model,
+            content = {},
+            usage = { input_tokens = 0, output_tokens = 0 }
+        }
+        setmetatable(message.content, core.json.empty_array_mt)
+
+        table.insert(events, make_sse_event("message_start", {
+            type = "message_start",
+            message = message,
+        }))
+        push_content_block_start(events, 0, { type = "text", text = "" })
+
+        state.is_first = false
+        state.content_index = 0
+        state.current_open_block = 0
+        state.tool_call_indices = {}
+    end
+
+    -- 2. Handle text content delta
+    if choice and choice.delta and type(choice.delta.content) == "string"
+            and choice.delta.content ~= "" then
+        push_content_block_delta(events, 0, {
+            type = "text_delta",
+            text = choice.delta.content,
+        })
+    end
+
+    -- 3. Handle tool_calls deltas
+    if choice and choice.delta and type(choice.delta.tool_calls) == "table" 
then
+        for _, tc_delta in ipairs(choice.delta.tool_calls) do
+            if type(tc_delta) ~= "table" then
+                goto continue_tc
+            end
+            local tc_idx = tc_delta.index
+            if tc_idx == nil then
+                goto continue_tc
+            end
+
+            if not state.tool_call_indices[tc_idx] then
+                if state.current_open_block ~= nil then
+                    push_content_block_stop(events, state.current_open_block)
+                end
+                state.content_index = state.content_index + 1
+                state.tool_call_indices[tc_idx] = state.content_index
+                state.current_open_block = state.content_index
+
+                local fn = tc_delta["function"] or {}
+                push_content_block_start(events, state.content_index, {
+                    type  = "tool_use",
+                    id    = tc_delta.id or "",
+                    name  = fn.name or "",
+                    input = {},
+                })
+            end
+
+            local fn = tc_delta["function"]
+            local args = fn and fn.arguments
+            if type(args) == "string" and args ~= "" then
+                push_content_block_delta(events, 
state.tool_call_indices[tc_idx], {
+                    type         = "input_json_delta",
+                    partial_json = args,
+                })
+            end
+
+            ::continue_tc::
+        end
+    end
+
+    -- 4. Handle stream completion
+    if choice and type(choice.finish_reason) == "string" then
+        if state.current_open_block ~= nil then
+            push_content_block_stop(events, state.current_open_block)
+            state.current_open_block = nil
+        end
+
+        local message_delta = {
+            type = "message_delta",
+            delta = {
+                stop_reason = openai_stop_reason_map[choice.finish_reason] or 
"end_turn",
+            },
+        }
+
+        if type(openai_chunk.usage) == "table" then
+            message_delta.usage = {
+                input_tokens  = openai_chunk.usage.prompt_tokens or 0,
+                output_tokens = openai_chunk.usage.completion_tokens or 0,
+            }
+        end
+
+        state.pending_message_delta = message_delta
+        state.pending_stop = true
+        state.is_done = true
+    end
+
+    return events
+end
+
+
+--- Convert parsed SSE events (from openai-chat adapter) to Anthropic format.
+-- Called with the result of openai_chat_adapter.parse_sse_event().
+-- @param parsed table Parsed SSE event from target adapter
+-- @param ctx table Request context
+-- @param state table Mutable converter state
+-- @return table|nil List of Anthropic SSE events to send to client
+function _M.convert_sse_events(parsed, _, state)
+    if not parsed or parsed.type == "skip" then
+        return nil
+    end
+
+    if parsed.type == "done" then
+        -- Flush any deferred message_stop
+        if state.pending_stop then
+            return openai_to_anthropic_sse({ choices = {} }, state)
+        end
+        return nil
+    end
+
+    if parsed.data then
+        return openai_to_anthropic_sse(parsed.data, state)
+    end
+
+    return nil
+end
+
+
+return _M
diff --git a/apisix/plugins/ai-protocols/converters/init.lua 
b/apisix/plugins/ai-protocols/converters/init.lua
index f0dba21b0..cbe0e2261 100644
--- a/apisix/plugins/ai-protocols/converters/init.lua
+++ b/apisix/plugins/ai-protocols/converters/init.lua
@@ -62,6 +62,9 @@ end
 -- To add a new converter, create a module file and add one line here.
 ---------------------------------------------------------------------
 
+register(require(
+    
"apisix.plugins.ai-protocols.converters.anthropic-messages-to-openai-chat"))
+
 register(require(
     
"apisix.plugins.ai-protocols.converters.openai-embeddings-to-vertex-predict"))
 
diff --git a/apisix/plugins/ai-protocols/init.lua 
b/apisix/plugins/ai-protocols/init.lua
index c290292c5..583c7e4aa 100644
--- a/apisix/plugins/ai-protocols/init.lua
+++ b/apisix/plugins/ai-protocols/init.lua
@@ -29,10 +29,12 @@ local _M = {}
 local registered = {
     ["openai-chat"] = require("apisix.plugins.ai-protocols.openai-chat"),
     ["openai-embeddings"] = 
require("apisix.plugins.ai-protocols.openai-embeddings"),
+    ["anthropic-messages"] = 
require("apisix.plugins.ai-protocols.anthropic-messages"),
 }
 
--- Detection order: body-only (chat, embeddings).
+-- Detection order: URL+body first (anthropic), then body-only (chat, 
embeddings).
 local detection_order = {
+    { name = "anthropic-messages", protocol = registered["anthropic-messages"] 
},
     { name = "openai-chat",       protocol = registered["openai-chat"] },
     { name = "openai-embeddings", protocol = registered["openai-embeddings"] },
 }
@@ -41,7 +43,7 @@ local detection_order = {
 --- Detect the client protocol by asking each protocol if it matches.
 -- @param body table The parsed request body
 -- @param ctx table The request context
--- @return string Protocol name: "openai-chat" | "openai-embeddings"
+-- @return string Protocol name: "openai-chat" | "openai-embeddings" | 
"anthropic-messages"
 function _M.detect(body, ctx)
     for _, entry in ipairs(detection_order) do
         if entry.protocol.matches(body, ctx) then
diff --git a/apisix/plugins/ai-protocols/openai-chat.lua 
b/apisix/plugins/ai-protocols/openai-chat.lua
index 22e1ee881..1875b4908 100644
--- a/apisix/plugins/ai-protocols/openai-chat.lua
+++ b/apisix/plugins/ai-protocols/openai-chat.lua
@@ -41,15 +41,14 @@ function _M.is_streaming(body)
 end
 
 
---- Prepare the request body for sending.
--- Injects stream_options if streaming, extracts model info.
--- @return table body (possibly mutated)
--- @return string|nil model from request body
-function _M.prepare_request(body, ctx, opts)
+--- Prepare the outgoing request body for the target provider.
+-- Injects stream_options so the provider includes usage in streaming 
responses.
+-- Called after protocol conversion in build_request(), covering both 
passthrough
+-- and convert scenarios.
+function _M.prepare_outgoing_request(body)
     if body.stream then
         body.stream_options = { include_usage = true }
     end
-    return body, body.model
 end
 
 
diff --git a/apisix/plugins/ai-protocols/openai-embeddings.lua 
b/apisix/plugins/ai-protocols/openai-embeddings.lua
index b2da98ef9..51feb24a5 100644
--- a/apisix/plugins/ai-protocols/openai-embeddings.lua
+++ b/apisix/plugins/ai-protocols/openai-embeddings.lua
@@ -36,10 +36,6 @@ function _M.is_streaming(_)
 end
 
 
-function _M.prepare_request(body, _, _)
-    return body, body and body.model
-end
-
 
 function _M.extract_usage(res_body)
     if not res_body or not res_body.usage then
diff --git a/apisix/plugins/ai-providers/anthropic.lua 
b/apisix/plugins/ai-providers/anthropic.lua
index 6c4afafdb..2e79e5da8 100644
--- a/apisix/plugins/ai-providers/anthropic.lua
+++ b/apisix/plugins/ai-providers/anthropic.lua
@@ -21,6 +21,7 @@ return require("apisix.plugins.ai-providers.base").new(
         port = 443,
         capabilities = {
             ["openai-chat"] = { path = "/v1/chat/completions" },
+            ["anthropic-messages"] = { path = "/v1/messages" },
         },
     }
 )
diff --git a/apisix/plugins/ai-providers/base.lua 
b/apisix/plugins/ai-providers/base.lua
index b5f79a493..4a69f9eaf 100644
--- a/apisix/plugins/ai-providers/base.lua
+++ b/apisix/plugins/ai-providers/base.lua
@@ -35,6 +35,7 @@ local sse  = require("apisix.plugins.ai-transport.sse")
 local transport_http = require("apisix.plugins.ai-transport.http")
 local transport_auth = require("apisix.plugins.ai-transport.auth")
 local log_sanitize = require("apisix.utils.log-sanitize")
+local protocols = require("apisix.plugins.ai-protocols")
 local ngx = ngx
 local ngx_now = ngx.now
 
@@ -93,6 +94,16 @@ function _M.build_request(self, ctx, conf, request_body, 
opts)
         request_body = converted
     end
 
+    -- Inject target-protocol-specific parameters (e.g. stream_options for 
OpenAI).
+    -- This runs after conversion so it covers both passthrough and convert 
scenarios.
+    local target_protocol = ctx.ai_target_protocol
+    if target_protocol then
+        local target_proto = protocols.get(target_protocol)
+        if target_proto and target_proto.prepare_outgoing_request then
+            target_proto.prepare_outgoing_request(request_body)
+        end
+    end
+
     core.log.info("request extra_opts to LLM server: ",
                   core.json.delay_encode(log_sanitize.redact_extra_opts(opts), 
true))
 
diff --git a/apisix/plugins/ai-proxy/base.lua b/apisix/plugins/ai-proxy/base.lua
index 0960366eb..3cf054e45 100644
--- a/apisix/plugins/ai-proxy/base.lua
+++ b/apisix/plugins/ai-proxy/base.lua
@@ -100,8 +100,8 @@ end
 -- Execute the AI proxy pipeline:
 --   1. Validate request
 --   2. Route client protocol to driver capability (passthrough / convert / 
error)
---   3. Prepare request via protocol (stream_options, model extraction)
---   4. Build HTTP request (protocol conversion, auth, headers)
+--   3. Extract model from request body
+--   4. Build HTTP request (protocol conversion, target protocol params, auth, 
headers)
 --   5. Send via transport
 --   6. Parse response (streaming or non-streaming)
 --   7. Set keepalive
@@ -156,11 +156,10 @@ function _M.before_proxy(conf, ctx, on_error)
             target_proto = target_protocol
         end
         ctx.ai_converter = converter
+        ctx.ai_target_protocol = target_proto
 
-        -- Step 2: Prepare request via protocol
-        local request_model
-        request_body, request_model = client_proto.prepare_request(
-            request_body, ctx, extra_opts)
+        -- Step 2: Extract model from request
+        local request_model = request_body.model
 
         if request_model then
             ctx.var.request_llm_model = request_model
diff --git a/t/APISIX.pm b/t/APISIX.pm
index ac0edef49..78e676c9d 100644
--- a/t/APISIX.pm
+++ b/t/APISIX.pm
@@ -777,6 +777,22 @@ _EOC_
         $ipv6_listen_conf = "listen \[::1\]:1984;"
     }
 
+    my $enable_test_control_api_v1 =
+        !defined($ENV{TEST_ENABLE_CONTROL_API_V1}) ||
+        $ENV{TEST_ENABLE_CONTROL_API_V1} ne "0";
+
+    my $control_api_v1_location = "";
+    if ($enable_test_control_api_v1) {
+        $control_api_v1_location = <<_EOC_;
+        location /v1/ {
+            content_by_lua_block {
+                apisix.http_control()
+            }
+        }
+
+_EOC_
+    }
+
     my $config = $block->config // '';
     $config .= <<_EOC_;
         $ipv6_listen_conf
@@ -825,11 +841,7 @@ _EOC_
             }
         }
 
-        location /v1/ {
-            content_by_lua_block {
-                apisix.http_control()
-            }
-        }
+        $control_api_v1_location
 
         location / {
             set \$upstream_mirror_host        '';
diff --git a/t/plugin/ai-proxy-protocol-conversion.t 
b/t/plugin/ai-proxy-protocol-conversion.t
new file mode 100644
index 000000000..a6ee0f2e2
--- /dev/null
+++ b/t/plugin/ai-proxy-protocol-conversion.t
@@ -0,0 +1,1384 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+BEGIN {
+    $ENV{TEST_ENABLE_CONTROL_API_V1} = "0";
+}
+
+use t::APISIX 'no_plan';
+
+log_level("info");
+repeat_each(1);
+no_long_string();
+no_root_location();
+
+
+my $resp_file = 't/assets/ai-proxy-response.json';
+open(my $fh, '<', $resp_file) or die "Could not open file '$resp_file' $!";
+my $resp = do { local $/; <$fh> };
+close($fh);
+
+
+add_block_preprocessor(sub {
+    my ($block) = @_;
+
+    if (!defined $block->request) {
+        $block->set_value("request", "GET /t");
+    }
+
+    my $http_config = $block->http_config // <<_EOC_;
+        server {
+            server_name openai;
+            listen 6724;
+
+            default_type 'application/json';
+
+            location /v1/chat/completions {
+                content_by_lua_block {
+                    local json = require("cjson.safe")
+
+                    if ngx.req.get_method() ~= "POST" then
+                        ngx.status = 400
+                        ngx.say("Unsupported request method: ", 
ngx.req.get_method())
+                    end
+                    ngx.req.read_body()
+                    local body, err = ngx.req.get_body_data()
+                    body, err = json.decode(body)
+
+                    local test_type = ngx.req.get_headers()["test-type"]
+                    if test_type == "options" then
+                        if body.foo == "bar" then
+                            ngx.status = 200
+                            ngx.say("options works")
+                        else
+                            ngx.status = 500
+                            ngx.say("model options feature doesn't work")
+                        end
+                        return
+                    end
+
+                    if test_type == "tools" then
+                        -- Verify request was converted from Anthropic to 
OpenAI tools format
+                        local tool = body and body.tools and body.tools[1]
+                        if not tool or tool.type ~= "function" or
+                           not tool["function"] or tool["function"].name ~= 
"get_weather" then
+                            ngx.status = 400
+                            ngx.say([[{"error": "tool not converted to openai 
format"}]])
+                            return
+                        end
+                        ngx.status = 200
+                        ngx.say([[{
+                            "id": "chatcmpl-tool",
+                            "object": "chat.completion",
+                            "model": "gpt-4o",
+                            "choices": [{
+                                "index": 0,
+                                "message": {
+                                    "role": "assistant",
+                                    "content": null,
+                                    "tool_calls": [{
+                                        "id": "call_abc123",
+                                        "type": "function",
+                                        "function": {
+                                            "name": "get_weather",
+                                            "arguments": "{\\\"location\\\": 
\\\"Paris\\\"}"
+                                        }
+                                    }]
+                                },
+                                "finish_reason": "tool_calls"
+                            }],
+                            "usage": {"prompt_tokens": 20, 
"completion_tokens": 10, "total_tokens": 30}
+                        }]])
+                        return
+                    end
+
+                    local header_auth = ngx.req.get_headers()["authorization"]
+                    local query_auth = ngx.req.get_uri_args()["apikey"]
+
+                    if header_auth ~= "Bearer token" and query_auth ~= 
"apikey" then
+                        ngx.status = 401
+                        ngx.say("Unauthorized")
+                        return
+                    end
+
+                    if header_auth == "Bearer token" or query_auth == "apikey" 
then
+                        ngx.req.read_body()
+                        local body, err = ngx.req.get_body_data()
+                        body, err = json.decode(body)
+
+                        if not body.messages or #body.messages < 1 then
+                            ngx.status = 400
+                            ngx.say([[{ "error": "bad request"}]])
+                            return
+                        end
+                        if body.messages[1].content == "write an SQL query to 
get all rows from student table" then
+                            ngx.print("SELECT * FROM STUDENTS")
+                            return
+                        end
+
+                        ngx.status = 200
+                        ngx.say([[$resp]])
+                        return
+                    end
+
+
+                    ngx.status = 503
+                    ngx.say("reached the end of the test suite")
+                }
+            }
+
+            location /random {
+                content_by_lua_block {
+                    ngx.say("path override works")
+                }
+            }
+        }
+_EOC_
+
+    $block->set_value("http_config", $http_config);
+});
+
+run_tests();
+
+__DATA__
+
+=== TEST 1: Set up route – Anthropic protocol auto-detected via exact 
/v1/messages URI (Non-stream)
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/routes/1',
+                 ngx.HTTP_PUT,
+                 [[{
+                    "uri": "/v1/messages",
+                    "plugins": {
+                        "ai-proxy": {
+                            "provider": "openai",
+                            "auth": {
+                                "header": {
+                                    "Authorization": "Bearer token"
+                                }
+                            },
+                            "override": {
+                                "endpoint": 
"http://localhost:6724/v1/chat/completions";
+                            }
+                        }
+                    }
+                }]]
+            )
+
+            if code >= 300 then
+                ngx.status = code
+            end
+            ngx.say(body)
+        }
+    }
+--- response_body
+passed
+
+
+
+=== TEST 2: Send Anthropic request to /v1/messages and verify protocol 
conversion
+--- request
+POST /v1/messages
+{ "model": "claude-3-5-sonnet-20241022", "messages": [ { "role": 
"user","content": "hello" } ] }
+--- more_headers
+Authorization: Bearer token
+--- error_code: 200
+--- response_body eval
+qr/"text":"1 \+ 1 = 2\."/
+
+
+
+=== TEST 3: Missing messages field returns 400
+--- request
+POST /v1/messages
+{ "model": "claude-3-5-sonnet-20241022" }
+--- more_headers
+Authorization: Bearer token
+--- error_code: 400
+--- response_body
+{"error_msg":"missing messages"}
+
+
+
+=== TEST 4: Malformed JSON body returns 400
+--- request
+POST /v1/messages
+this is not valid json
+--- more_headers
+Authorization: Bearer token
+Content-Type: application/json
+--- error_code: 400
+
+
+
+=== TEST 5: messages field is wrong type (non-array)
+--- request
+POST /v1/messages
+{ "model": "claude-3-5-sonnet-20241022", "messages": "hello" }
+--- more_headers
+Authorization: Bearer token
+--- error_code: 400
+--- response_body
+{"error_msg":"missing messages"}
+
+
+
+=== TEST 6: messages is an empty array
+--- request
+POST /v1/messages
+{ "model": "claude-3-5-sonnet-20241022", "messages": [] }
+--- more_headers
+Authorization: Bearer token
+--- error_code: 400
+--- response_body
+{"error_msg":"missing messages"}
+
+
+
+=== TEST 7: Set up route for stream test – exact URI /v1/messages triggers 
Anthropic detection
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/routes/1',
+                 ngx.HTTP_PUT,
+                 [[{
+                    "uri": "/v1/messages",
+                    "plugins": {
+                        "ai-proxy": {
+                            "provider": "openai",
+                            "auth": {
+                                "header": {
+                                    "Authorization": "Bearer token"
+                                }
+                            },
+                            "options": {
+                                "model": "claude-3-5-sonnet-20241022",
+                                "stream": true
+                            },
+                            "override": {
+                                "endpoint": 
"http://localhost:6724/v1/chat/completions";
+                            }
+                        }
+                    }
+                }]]
+            )
+
+            if code >= 300 then
+                ngx.status = code
+            end
+            ngx.say(body)
+        }
+    }
+--- response_body
+passed
+
+
+
+=== TEST 8: Send Anthropic stream request and verify SSE conversion
+--- http_config
+        server {
+            server_name openai;
+            listen 6724;
+
+            default_type 'application/json';
+
+            location /v1/chat/completions {
+                content_by_lua_block {
+                    local json = require("toolkit.json")
+                    ngx.req.read_body()
+                    local body = json.decode(ngx.req.get_body_data())
+
+                    if not body.stream then
+                        ngx.status = 400
+                        ngx.say("Expected stream=true")
+                        return
+                    end
+
+                    ngx.header["Content-Type"] = "text/event-stream"
+                    ngx.say("data: " .. json.encode({
+                        id = "chatcmpl-123",
+                        object = "chat.completion.chunk",
+                        model = "gpt-4o",
+                        choices = {{ index = 0, delta = { role = "assistant" 
}, finish_reason = nil }}
+                    }) .. "\n")
+                    ngx.flush(true)
+
+                    ngx.say("data: " .. json.encode({
+                        id = "chatcmpl-123",
+                        object = "chat.completion.chunk",
+                        model = "gpt-4o",
+                        choices = {{ index = 0, delta = { content = "Hello" }, 
finish_reason = nil }}
+                    }) .. "\n")
+                    ngx.flush(true)
+
+                    ngx.say("data: " .. json.encode({
+                        id = "chatcmpl-123",
+                        object = "chat.completion.chunk",
+                        model = "gpt-4o",
+                        choices = {{ index = 0, delta = { content = " world" 
}, finish_reason = "stop" }},
+                        usage = { prompt_tokens = 5, completion_tokens = 5, 
total_tokens = 10 }
+                    }) .. "\n")
+                    ngx.flush(true)
+
+                    ngx.say("data: [DONE]\n")
+                }
+            }
+        }
+--- config
+    location /t {
+        content_by_lua_block {
+            local http = require("resty.http")
+            local httpc = http.new()
+            local core = require("apisix.core")
+
+            local ok, err = httpc:connect({
+                scheme = "http",
+                host = "localhost",
+                port = ngx.var.server_port,
+            })
+
+            local res, err = httpc:request({
+                method = "POST",
+                path = "/v1/messages",
+                headers = { ["Content-Type"] = "application/json", 
["Connection"] = "close" },
+                body = [[{
+                    "model": "claude-3-5-sonnet-20241022",
+                    "messages": [{"role": "user", "content": "Hi"}],
+                    "stream": true
+                }]],
+            })
+
+            local results = {}
+            while true do
+                local chunk, err = res.body_reader()
+                if not chunk then break end
+                table.insert(results, chunk)
+            end
+
+            ngx.print(table.concat(results, ""))
+        }
+    }
+--- error_code: 200
+--- response_body eval
+qr/event: message_start\ndata:.*?"type":"message_start".*?event: 
content_block_start\ndata:.*?event: 
content_block_delta\ndata:.*?"text":"Hello".*?event: 
content_block_delta\ndata:.*?"text":" world".*?event: 
content_block_stop\ndata:.*?event: message_delta\ndata:.*?event: 
message_stop\ndata:/s
+
+
+
+=== TEST 9: Set up route for system prompt conversion test
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/routes/1',
+                 ngx.HTTP_PUT,
+                 [[{
+                    "uri": "/v1/messages",
+                    "plugins": {
+                        "ai-proxy": {
+                            "provider": "openai",
+                            "auth": {
+                                "header": {
+                                    "Authorization": "Bearer token"
+                                }
+                            },
+                            "override": {
+                                "endpoint": 
"http://localhost:6724/v1/chat/completions";
+                            }
+                        }
+                    }
+                }]]
+            )
+
+            if code >= 300 then
+                ngx.status = code
+            end
+            ngx.say(body)
+        }
+    }
+--- response_body
+passed
+
+
+
+=== TEST 10: System prompt is converted to OpenAI messages[0] with role=system
+--- http_config
+        server {
+            server_name openai;
+            listen 6724;
+
+            default_type 'application/json';
+
+            location /v1/chat/completions {
+                content_by_lua_block {
+                    local json = require("cjson.safe")
+                    ngx.req.read_body()
+                    local body, err = json.decode(ngx.req.get_body_data())
+
+                    if not body or not body.messages then
+                        ngx.status = 400
+                        ngx.say([[{"error": "no messages"}]])
+                        return
+                    end
+
+                    -- Verify that system prompt has been converted to first 
message
+                    local first_msg = body.messages[1]
+                    if not first_msg or first_msg.role ~= "system" then
+                        ngx.status = 400
+                        ngx.say([[{"error": "system message not found or wrong 
role"}]])
+                        return
+                    end
+
+                    if first_msg.content ~= "You are a helpful assistant." then
+                        ngx.status = 400
+                        ngx.say([[{"error": "system message content mismatch: 
]] .. (first_msg.content or "") .. [["}]])
+                        return
+                    end
+
+                    local second_msg = body.messages[2]
+                    if not second_msg or second_msg.role ~= "user" then
+                        ngx.status = 400
+                        ngx.say([[{"error": "user message not in correct 
position"}]])
+                        return
+                    end
+
+                    ngx.status = 200
+                    ngx.say([[{
+                        "id": "chatcmpl-sys",
+                        "object": "chat.completion",
+                        "model": "gpt-4o",
+                        "choices": [{
+                            "index": 0,
+                            "message": {"role": "assistant", "content": 
"system prompt ok"},
+                            "finish_reason": "stop"
+                        }],
+                        "usage": {"prompt_tokens": 10, "completion_tokens": 5, 
"total_tokens": 15}
+                    }]])
+                }
+            }
+        }
+--- config
+    location /t {
+        content_by_lua_block {
+            local http = require("resty.http")
+            local httpc = http.new()
+
+            local ok, err = httpc:connect({
+                scheme = "http",
+                host = "localhost",
+                port = ngx.var.server_port,
+            })
+
+            local res, err = httpc:request({
+                method = "POST",
+                path = "/v1/messages",
+                headers = { ["Content-Type"] = "application/json", 
["Connection"] = "close" },
+                body = [[{
+                    "model": "claude-3-5-sonnet-20241022",
+                    "system": "You are a helpful assistant.",
+                    "messages": [{"role": "user", "content": "hello"}]
+                }]],
+            })
+
+            local body = res:read_body()
+            ngx.status = res.status
+            ngx.print(body)
+        }
+    }
+--- error_code: 200
+--- response_body eval
+qr/"text":"system prompt ok"/
+
+
+
+=== TEST 11: Set up route for tool calling conversion test
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/routes/1',
+                 ngx.HTTP_PUT,
+                 [[{
+                    "uri": "/v1/messages",
+                    "plugins": {
+                        "ai-proxy": {
+                            "provider": "openai",
+                            "auth": {
+                                "header": {
+                                    "Authorization": "Bearer token"
+                                }
+                            },
+                            "override": {
+                                "endpoint": 
"http://localhost:6724/v1/chat/completions";
+                            }
+                        }
+                    }
+                }]]
+            )
+
+            if code >= 300 then
+                ngx.status = code
+            end
+            ngx.say(body)
+        }
+    }
+--- response_body
+passed
+
+
+
+=== TEST 12: Tool calling request/response conversion (Anthropic <-> OpenAI)
+--- request
+POST /v1/messages
+{"model":"claude-3-5-sonnet-20241022","messages":[{"role":"user","content":"What
 is the weather in Paris?"}],"tools":[{"name":"get_weather","description":"Get 
weather","input_schema":{"type":"object","properties":{"location":{"type":"string"}},"required":["location"]}}]}
+--- more_headers
+Authorization: Bearer token
+test-type: tools
+--- error_code: 200
+--- response_body eval
+qr/(?=.*"stop_reason":"tool_use")(?=.*"type":"tool_use")(?=.*"name":"get_weather")/s
+
+
+
+=== TEST 13: Set up route for null finish_reason test
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/routes/1',
+                 ngx.HTTP_PUT,
+                 [[{
+                    "uri": "/v1/messages",
+                    "plugins": {
+                        "ai-proxy": {
+                            "provider": "openai",
+                            "auth": {
+                                "header": {
+                                    "Authorization": "Bearer token"
+                                }
+                            },
+                            "options": {
+                                "model": "gpt-4o",
+                                "stream": true
+                            },
+                            "override": {
+                                "endpoint": 
"http://localhost:6724/v1/chat/completions";
+                            }
+                        }
+                    }
+                }]]
+            )
+
+            if code >= 300 then
+                ngx.status = code
+            end
+            ngx.say(body)
+        }
+    }
+--- response_body
+passed
+
+
+
+=== TEST 14: message_stop emitted only once (finish_reason as JSON null must 
not trigger end events)
+--- http_config
+        server {
+            server_name openai;
+            listen 6724;
+
+            default_type 'application/json';
+
+            location /v1/chat/completions {
+                content_by_lua_block {
+                    ngx.header["Content-Type"] = "text/event-stream"
+
+                    -- chunk 1: role only, finish_reason = null (JSON null)
+                    ngx.say('data: 
{"id":"chatcmpl-null","object":"chat.completion.chunk","model":"gpt-4o","choices":[{"index":0,"delta":{"role":"assistant"},"finish_reason":null}]}'
 .. "\n")
+                    ngx.flush(true)
+
+                    -- chunk 2: content delta, finish_reason = null (JSON null)
+                    ngx.say('data: 
{"id":"chatcmpl-null","object":"chat.completion.chunk","model":"gpt-4o","choices":[{"index":0,"delta":{"content":"Hi"},"finish_reason":null}]}'
 .. "\n")
+                    ngx.flush(true)
+
+                    -- chunk 3: final, finish_reason = "stop"
+                    ngx.say('data: 
{"id":"chatcmpl-null","object":"chat.completion.chunk","model":"gpt-4o","choices":[{"index":0,"delta":{"content":"!"},"finish_reason":"stop"}],"usage":{"prompt_tokens":5,"completion_tokens":3,"total_tokens":8}}'
 .. "\n")
+                    ngx.flush(true)
+
+                    ngx.say("data: [DONE]\n")
+                }
+            }
+        }
+--- config
+    location /t {
+        content_by_lua_block {
+            local http = require("resty.http")
+            local httpc = http.new()
+
+            local ok, err = httpc:connect({
+                scheme = "http",
+                host = "localhost",
+                port = ngx.var.server_port,
+            })
+
+            local res, err = httpc:request({
+                method = "POST",
+                path = "/v1/messages",
+                headers = { ["Content-Type"] = "application/json", 
["Connection"] = "close" },
+                body = [[{
+                    "model": "gpt-4o",
+                    "messages": [{"role": "user", "content": "Hi"}],
+                    "stream": true
+                }]],
+            })
+
+            local results = {}
+            while true do
+                local chunk, err = res.body_reader()
+                if not chunk then break end
+                table.insert(results, chunk)
+            end
+
+            local body = table.concat(results, "")
+
+            -- count occurrences of message_stop
+            local count = 0
+            for _ in body:gmatch('"type":"message_stop"') do
+                count = count + 1
+            end
+
+            -- also verify the final content delta and message_delta were 
emitted
+            local has_final_content = body:find('"text":"!"', 1, true) ~= nil
+            local has_message_delta = body:find('"type":"message_delta"', 1, 
true) ~= nil
+
+            if count ~= 1 then
+                ngx.say("FAIL: message_stop appeared " .. count .. " times, 
expected 1")
+            elseif not has_final_content then
+                ngx.say("FAIL: final content '!' not found in body")
+            elseif not has_message_delta then
+                ngx.say("FAIL: message_delta event not found in body")
+            else
+                ngx.say("OK: message_stop appeared exactly once")
+            end
+        }
+    }
+--- error_code: 200
+--- response_body
+OK: message_stop appeared exactly once
+
+
+
+=== TEST 15: Set up route for OpenRouter-style double finish_reason chunk test
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/routes/1',
+                 ngx.HTTP_PUT,
+                 [[{
+                    "uri": "/v1/messages",
+                    "plugins": {
+                        "ai-proxy": {
+                            "provider": "openai",
+                            "auth": {
+                                "header": {
+                                    "Authorization": "Bearer token"
+                                }
+                            },
+                            "options": {
+                                "model": "gpt-4o",
+                                "stream": true
+                            },
+                            "override": {
+                                "endpoint": 
"http://localhost:6724/v1/chat/completions";
+                            }
+                        }
+                    }
+                }]]
+            )
+
+            if code >= 300 then
+                ngx.status = code
+            end
+            ngx.say(body)
+        }
+    }
+--- response_body
+passed
+
+
+
+=== TEST 16: OpenRouter sends two finish_reason chunks — message_stop must 
appear exactly once, no empty content_block_delta
+--- http_config
+        server {
+            server_name openai;
+            listen 6724;
+
+            default_type 'application/json';
+
+            location /v1/chat/completions {
+                content_by_lua_block {
+                    ngx.header["Content-Type"] = "text/event-stream"
+
+                    -- chunk 1: first content token
+                    ngx.say('data: 
{"id":"gen-1","object":"chat.completion.chunk","model":"openai/gpt-4o","choices":[{"index":0,"delta":{"content":"Hi","role":"assistant"},"finish_reason":null,"native_finish_reason":null}]}'
 .. "\n")
+                    ngx.flush(true)
+
+                    -- chunk 2: second content token
+                    ngx.say('data: 
{"id":"gen-1","object":"chat.completion.chunk","model":"openai/gpt-4o","choices":[{"index":0,"delta":{"content":"!","role":"assistant"},"finish_reason":null,"native_finish_reason":null}]}'
 .. "\n")
+                    ngx.flush(true)
+
+                    -- chunk 3: finish_reason=stop, empty content, NO usage 
(OpenRouter first stop chunk)
+                    ngx.say('data: 
{"id":"gen-1","object":"chat.completion.chunk","model":"openai/gpt-4o","choices":[{"index":0,"delta":{"content":"","role":"assistant"},"finish_reason":"stop","native_finish_reason":"stop"}]}'
 .. "\n")
+                    ngx.flush(true)
+
+                    -- chunk 4: finish_reason=stop, empty content, WITH usage 
(OpenRouter second stop chunk)
+                    ngx.say('data: 
{"id":"gen-1","object":"chat.completion.chunk","model":"openai/gpt-4o","choices":[{"index":0,"delta":{"content":"","role":"assistant"},"finish_reason":"stop","native_finish_reason":"stop"}],"usage":{"prompt_tokens":8,"completion_tokens":2,"total_tokens":10}}'
 .. "\n")
+                    ngx.flush(true)
+
+                    ngx.say("data: [DONE]\n")
+                }
+            }
+        }
+--- config
+    location /t {
+        content_by_lua_block {
+            local http = require("resty.http")
+            local httpc = http.new()
+
+            local ok, err = httpc:connect({
+                scheme = "http",
+                host = "localhost",
+                port = ngx.var.server_port,
+            })
+
+            local res, err = httpc:request({
+                method = "POST",
+                path = "/v1/messages",
+                headers = { ["Content-Type"] = "application/json", 
["Connection"] = "close" },
+                body = [[{
+                    "model": "gpt-4o",
+                    "messages": [{"role": "user", "content": "Hi"}],
+                    "stream": true
+                }]],
+            })
+
+            local results = {}
+            while true do
+                local chunk, err = res.body_reader()
+                if not chunk then break end
+                table.insert(results, chunk)
+            end
+
+            local body = table.concat(results, "")
+
+            -- Count message_stop occurrences (must be exactly 1)
+            local stop_count = 0
+            for _ in body:gmatch('"type":"message_stop"') do
+                stop_count = stop_count + 1
+            end
+
+            -- Verify no empty text_delta events are emitted (finish_reason 
chunks with
+            -- empty content must not produce content_block_delta events).
+            -- content_block_start legitimately contains text:"", so we check 
the
+            -- per-event data for content_block_delta specifically.
+            local empty_delta_count = 0
+            for event_data in body:gmatch('event: content_block_delta\ndata: 
([^\n]+)') do
+                local decoded = require("cjson.safe").decode(event_data)
+                if decoded and decoded.delta and decoded.delta.text == "" then
+                    empty_delta_count = empty_delta_count + 1
+                end
+            end
+
+            -- Verify content tokens are present
+            local has_hi = body:find('"text":"Hi"', 1, true) ~= nil
+            local has_bang = body:find('"text":"!"', 1, true) ~= nil
+
+            if stop_count ~= 1 then
+                ngx.say("FAIL: message_stop appeared " .. stop_count .. " 
times, expected 1")
+            elseif empty_delta_count > 0 then
+                ngx.say("FAIL: found " .. empty_delta_count .. " empty 
text_delta(s), expected 0")
+            elseif not has_hi then
+                ngx.say("FAIL: content 'Hi' not found")
+            elseif not has_bang then
+                ngx.say("FAIL: content '!' not found")
+            else
+                ngx.say("OK: two finish_reason chunks handled correctly")
+            end
+        }
+    }
+--- error_code: 200
+--- response_body
+OK: two finish_reason chunks handled correctly
+
+
+
+=== TEST 17: Set up route for DeepSeek-style usage:null crash test
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/routes/1',
+                 ngx.HTTP_PUT,
+                 [[{
+                    "uri": "/v1/messages",
+                    "plugins": {
+                        "ai-proxy": {
+                            "provider": "openai",
+                            "auth": {
+                                "header": {
+                                    "Authorization": "Bearer token"
+                                }
+                            },
+                            "options": {
+                                "model": "deepseek-chat",
+                                "stream": true
+                            },
+                            "override": {
+                                "endpoint": 
"http://localhost:6724/v1/chat/completions";
+                            }
+                        }
+                    }
+                }]]
+            )
+
+            if code >= 300 then
+                ngx.status = code
+            end
+            ngx.say(body)
+        }
+    }
+--- response_body
+passed
+
+
+
+=== TEST 18: DeepSeek sends usage:null on non-final chunks — must not crash, 
content must be preserved
+--- http_config
+        server {
+            server_name openai;
+            listen 6724;
+
+            default_type 'application/json';
+
+            location /v1/chat/completions {
+                content_by_lua_block {
+                    ngx.header["Content-Type"] = "text/event-stream"
+
+                    -- chunk 1: role only, usage:null (DeepSeek pattern)
+                    ngx.say('data: 
{"id":"ds-1","object":"chat.completion.chunk","model":"deepseek-chat","choices":[{"index":0,"delta":{"role":"assistant","content":""},"finish_reason":null}],"usage":null}'
 .. "\n")
+                    ngx.flush(true)
+
+                    -- chunk 2: first content, usage:null
+                    ngx.say('data: 
{"id":"ds-1","object":"chat.completion.chunk","model":"deepseek-chat","choices":[{"index":0,"delta":{"content":"Hi"},"finish_reason":null}],"usage":null}'
 .. "\n")
+                    ngx.flush(true)
+
+                    -- chunk 3: second content, usage:null
+                    ngx.say('data: 
{"id":"ds-1","object":"chat.completion.chunk","model":"deepseek-chat","choices":[{"index":0,"delta":{"content":"!"},"finish_reason":null}],"usage":null}'
 .. "\n")
+                    ngx.flush(true)
+
+                    -- chunk 4: final, finish_reason=stop, usage populated
+                    ngx.say('data: 
{"id":"ds-1","object":"chat.completion.chunk","model":"deepseek-chat","choices":[{"index":0,"delta":{"content":""},"finish_reason":"stop"}],"usage":{"prompt_tokens":5,"completion_tokens":2,"total_tokens":7}}'
 .. "\n")
+                    ngx.flush(true)
+
+                    ngx.say("data: [DONE]\n")
+                }
+            }
+        }
+--- config
+    location /t {
+        content_by_lua_block {
+            local http = require("resty.http")
+            local httpc = http.new()
+
+            local ok, err = httpc:connect({
+                scheme = "http",
+                host = "localhost",
+                port = ngx.var.server_port,
+            })
+
+            local res, err = httpc:request({
+                method = "POST",
+                path = "/v1/messages",
+                headers = { ["Content-Type"] = "application/json", 
["Connection"] = "close" },
+                body = [[{
+                    "model": "deepseek-chat",
+                    "messages": [{"role": "user", "content": "Hi"}],
+                    "stream": true
+                }]],
+            })
+
+            local results = {}
+            while true do
+                local chunk, err = res.body_reader()
+                if not chunk then break end
+                table.insert(results, chunk)
+            end
+
+            local body = table.concat(results, "")
+
+            local stop_count = 0
+            for _ in body:gmatch('"type":"message_stop"') do
+                stop_count = stop_count + 1
+            end
+
+            local has_hi = body:find('"text":"Hi"', 1, true) ~= nil
+            local has_bang = body:find('"text":"!"', 1, true) ~= nil
+            local has_message_start = body:find('event: message_start', 1, 
true) ~= nil
+
+            if not has_message_start then
+                ngx.say("FAIL: message_start not found (possible 500 crash)")
+            elseif stop_count ~= 1 then
+                ngx.say("FAIL: message_stop appeared " .. stop_count .. " 
times, expected 1")
+            elseif not has_hi then
+                ngx.say("FAIL: content 'Hi' not found")
+            elseif not has_bang then
+                ngx.say("FAIL: content '!' not found")
+            else
+                ngx.say("OK: DeepSeek usage:null chunks handled correctly")
+            end
+        }
+    }
+--- error_code: 200
+--- response_body
+OK: DeepSeek usage:null chunks handled correctly
+
+
+
+=== TEST 19: Set up route for first-chunk role+content test (OpenRouter 
pattern)
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/routes/1',
+                 ngx.HTTP_PUT,
+                 [[{
+                    "uri": "/v1/messages",
+                    "plugins": {
+                        "ai-proxy": {
+                            "provider": "openai",
+                            "auth": {
+                                "header": {
+                                    "Authorization": "Bearer token"
+                                }
+                            },
+                            "options": {
+                                "model": "gpt-4o",
+                                "stream": true
+                            },
+                            "override": {
+                                "endpoint": 
"http://localhost:6724/v1/chat/completions";
+                            }
+                        }
+                    }
+                }]]
+            )
+
+            if code >= 300 then
+                ngx.status = code
+            end
+            ngx.say(body)
+        }
+    }
+--- response_body
+passed
+
+
+
+=== TEST 20: First chunk contains both role and content simultaneously — 
content must not be lost
+--- http_config
+        server {
+            server_name openai;
+            listen 6724;
+
+            default_type 'application/json';
+
+            location /v1/chat/completions {
+                content_by_lua_block {
+                    ngx.header["Content-Type"] = "text/event-stream"
+
+                    -- chunk 1: role AND content in the same delta (OpenRouter 
pattern)
+                    ngx.say('data: 
{"id":"gen-1","object":"chat.completion.chunk","model":"openai/gpt-4o","choices":[{"index":0,"delta":{"content":"hello","role":"assistant"},"finish_reason":null}]}'
 .. "\n")
+                    ngx.flush(true)
+
+                    -- chunk 2: more content
+                    ngx.say('data: 
{"id":"gen-1","object":"chat.completion.chunk","model":"openai/gpt-4o","choices":[{"index":0,"delta":{"content":"
 world"},"finish_reason":null}]}' .. "\n")
+                    ngx.flush(true)
+
+                    -- chunk 3: final, finish_reason=stop, empty content, with 
usage
+                    ngx.say('data: 
{"id":"gen-1","object":"chat.completion.chunk","model":"openai/gpt-4o","choices":[{"index":0,"delta":{"content":""},"finish_reason":"stop"}],"usage":{"prompt_tokens":8,"completion_tokens":2,"total_tokens":10}}'
 .. "\n")
+                    ngx.flush(true)
+
+                    ngx.say("data: [DONE]\n")
+                }
+            }
+        }
+--- config
+    location /t {
+        content_by_lua_block {
+            local http = require("resty.http")
+            local httpc = http.new()
+
+            local ok, err = httpc:connect({
+                scheme = "http",
+                host = "localhost",
+                port = ngx.var.server_port,
+            })
+
+            local res, err = httpc:request({
+                method = "POST",
+                path = "/v1/messages",
+                headers = { ["Content-Type"] = "application/json", 
["Connection"] = "close" },
+                body = [[{
+                    "model": "gpt-4o",
+                    "messages": [{"role": "user", "content": "Hi"}],
+                    "stream": true
+                }]],
+            })
+
+            local results = {}
+            while true do
+                local chunk, err = res.body_reader()
+                if not chunk then break end
+                table.insert(results, chunk)
+            end
+
+            local body = table.concat(results, "")
+
+            local has_hello = body:find('"hello"', 1, true) ~= nil
+            local has_world = body:find('" world"', 1, true) ~= nil
+            local stop_count = 0
+            for _ in body:gmatch('"type":"message_stop"') do
+                stop_count = stop_count + 1
+            end
+
+            if not has_hello then
+                ngx.say("FAIL: content 'hello' lost from first chunk")
+            elseif not has_world then
+                ngx.say("FAIL: content ' world' lost from second chunk")
+            elseif stop_count ~= 1 then
+                ngx.say("FAIL: message_stop appeared " .. stop_count .. " 
times, expected 1")
+            else
+                ngx.say("OK: first-chunk role+content preserved correctly")
+            end
+        }
+    }
+--- error_code: 200
+--- response_body
+OK: first-chunk role+content preserved correctly
+
+
+
+=== TEST 21: sse.encode output must end with \n\n (SSE spec requires 
blank-line event terminator)
+--- config
+    location /t {
+        content_by_lua_block {
+            local sse = require("apisix.plugins.ai-transport.sse")
+
+            -- Test a named event (e.g. message_stop)
+            local out = sse.encode({ type = "message_stop", data = 
'{"type":"message_stop"}' })
+            if out:sub(-2) ~= "\n\n" then
+                ngx.say("FAIL: named event does not end with \\n\\n, got: " ..
+                        string.format("%q", out:sub(-4)))
+                return
+            end
+
+            -- Test a plain data event (type == "message", no event: line)
+            local out2 = sse.encode({ type = "message", data = '{"foo":"bar"}' 
})
+            if out2:sub(-2) ~= "\n\n" then
+                ngx.say("FAIL: data-only event does not end with \\n\\n, got: 
" ..
+                        string.format("%q", out2:sub(-4)))
+                return
+            end
+
+            ngx.say("OK: sse.encode output ends with \\n\\n")
+        }
+    }
+--- response_body
+OK: sse.encode output ends with \n\n
+
+
+
+=== TEST 22: empty SSE data frames between real chunks must not trigger JSON 
decode warnings
+--- http_config
+        server {
+            server_name openai;
+            listen 6724;
+
+            default_type 'application/json';
+
+            location /v1/chat/completions {
+                content_by_lua_block {
+                    ngx.header["Content-Type"] = "text/event-stream"
+
+                    -- chunk 1: real content
+                    ngx.say('data: 
{"id":"chatcmpl-empty","object":"chat.completion.chunk","model":"gpt-4o","choices":[{"index":0,"delta":{"role":"assistant","content":"Hi"},"finish_reason":null}]}'
 .. "\n")
+                    ngx.flush(true)
+
+                    -- empty data frame (blank line between events)
+                    ngx.say("data: \n")
+                    ngx.flush(true)
+
+                    -- chunk 2: final with finish_reason=stop
+                    ngx.say('data: 
{"id":"chatcmpl-empty","object":"chat.completion.chunk","model":"gpt-4o","choices":[{"index":0,"delta":{"content":""},"finish_reason":"stop"}],"usage":{"prompt_tokens":3,"completion_tokens":1,"total_tokens":4}}'
 .. "\n")
+                    ngx.flush(true)
+
+                    ngx.say("data: [DONE]\n")
+                }
+            }
+        }
+--- config
+    location /t {
+        content_by_lua_block {
+            local http = require("resty.http")
+            local httpc = http.new()
+
+            local ok, err = httpc:connect({
+                scheme = "http",
+                host = "localhost",
+                port = ngx.var.server_port,
+            })
+
+            local res, err = httpc:request({
+                method = "POST",
+                path = "/v1/messages",
+                headers = { ["Content-Type"] = "application/json", 
["Connection"] = "close" },
+                body = [[{
+                    "model": "gpt-4o",
+                    "messages": [{"role": "user", "content": "Hi"}],
+                    "stream": true
+                }]],
+            })
+
+            local results = {}
+            while true do
+                local chunk, err = res.body_reader()
+                if not chunk then break end
+                table.insert(results, chunk)
+            end
+
+            local body = table.concat(results, "")
+            local has_content = body:find('"text":"Hi"', 1, true) ~= nil
+            local has_stop = body:find('"type":"message_stop"', 1, true) ~= nil
+
+            if not has_content then
+                ngx.say("FAIL: content 'Hi' not found")
+            elseif not has_stop then
+                ngx.say("FAIL: message_stop not found")
+            else
+                ngx.say("OK: empty data frame handled without error")
+            end
+        }
+    }
+--- error_code: 200
+--- response_body
+OK: empty data frame handled without error
+--- no_error_log
+failed to decode SSE data
+
+
+
+=== TEST 23: sse.encode handles edge cases correctly
+--- config
+    location /t {
+        content_by_lua_block {
+            local sse = require("apisix.plugins.ai-transport.sse")
+
+            -- empty string data: should still produce a valid SSE frame 
ending with \n\n
+            local out1 = sse.encode({ type = "content_block_delta", data = "" 
})
+            if out1:sub(-2) ~= "\n\n" then
+                ngx.say("FAIL: empty data does not end with \\n\\n")
+                return
+            end
+
+            -- large payload: must not be truncated, must end with \n\n
+            local large_data = string.rep("x", 8192)
+            local out2 = sse.encode({ type = "content_block_delta", data = 
large_data })
+            if out2:sub(-2) ~= "\n\n" then
+                ngx.say("FAIL: large payload does not end with \\n\\n")
+                return
+            end
+            if not out2:find(large_data, 1, true) then
+                ngx.say("FAIL: large payload was truncated")
+                return
+            end
+
+            -- special characters in data: quotes, backslashes, newlines must 
be preserved
+            local special_data = 
'{"text":"line1\\nline2","quote":"\\"hello\\""}'
+            local out3 = sse.encode({ type = "content_block_delta", data = 
special_data })
+            if out3:sub(-2) ~= "\n\n" then
+                ngx.say("FAIL: special-char data does not end with \\n\\n")
+                return
+            end
+            if not out3:find(special_data, 1, true) then
+                ngx.say("FAIL: special characters were mangled")
+                return
+            end
+
+            ngx.say("OK: sse.encode edge cases passed")
+        }
+    }
+--- response_body
+OK: sse.encode edge cases passed
+
+
+
+=== TEST 24: Set up route for usage-only final chunk test
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/routes/1',
+                 ngx.HTTP_PUT,
+                 [[{
+                    "uri": "/v1/messages",
+                    "plugins": {
+                        "ai-proxy": {
+                            "provider": "openai",
+                            "auth": {
+                                "header": {
+                                    "Authorization": "Bearer token"
+                                }
+                            },
+                            "options": {
+                                "model": "gpt-4o",
+                                "stream": true
+                            },
+                            "override": {
+                                "endpoint": 
"http://localhost:6724/v1/chat/completions";
+                            }
+                        }
+                    }
+                }]]
+            )
+
+            if code >= 300 then
+                ngx.status = code
+            end
+            ngx.say(body)
+        }
+    }
+--- response_body
+passed
+
+
+
+=== TEST 25: usage in a separate chunk after message_stop — message_delta with 
usage must be emitted
+--- http_config
+        server {
+            server_name openai;
+            listen 6724;
+
+            default_type 'application/json';
+
+            location /v1/chat/completions {
+                content_by_lua_block {
+                    ngx.header["Content-Type"] = "text/event-stream"
+
+                    -- chunk 1: content
+                    ngx.say('data: 
{"id":"cmpl-1","object":"chat.completion.chunk","model":"gpt-4o","choices":[{"index":0,"delta":{"role":"assistant","content":"Hi"},"finish_reason":null}],"usage":null}'
 .. "\n")
+                    ngx.flush(true)
+
+                    -- chunk 2: finish_reason=stop, usage=null (usage not yet 
available)
+                    ngx.say('data: 
{"id":"cmpl-1","object":"chat.completion.chunk","model":"gpt-4o","choices":[{"index":0,"delta":{},"finish_reason":"stop"}],"usage":null}'
 .. "\n")
+                    ngx.flush(true)
+
+                    -- chunk 3: usage-only chunk with no choices (sent after 
message_stop)
+                    ngx.say('data: 
{"id":"cmpl-1","object":"chat.completion.chunk","model":"gpt-4o","choices":[],"usage":{"prompt_tokens":10,"completion_tokens":5,"total_tokens":15}}'
 .. "\n")
+                    ngx.flush(true)
+
+                    ngx.say("data: [DONE]\n")
+                }
+            }
+        }
+--- config
+    location /t {
+        content_by_lua_block {
+            local http = require("resty.http")
+            local httpc = http.new()
+
+            local ok, err = httpc:connect({
+                scheme = "http",
+                host = "localhost",
+                port = ngx.var.server_port,
+            })
+
+            local res, err = httpc:request({
+                method = "POST",
+                path = "/v1/messages",
+                headers = { ["Content-Type"] = "application/json", 
["Connection"] = "close" },
+                body = [[{
+                    "model": "gpt-4o",
+                    "messages": [{"role": "user", "content": "Hi"}],
+                    "stream": true
+                }]],
+            })
+
+            local results = {}
+            while true do
+                local chunk, err = res.body_reader()
+                if not chunk then break end
+                table.insert(results, chunk)
+            end
+
+            local body = table.concat(results, "")
+
+            -- message_stop must appear exactly once
+            local stop_count = 0
+            for _ in body:gmatch('"type":"message_stop"') do
+                stop_count = stop_count + 1
+            end
+
+            -- At least one message_delta must carry usage (input_tokens + 
output_tokens)
+            local has_usage = body:find('"input_tokens":10', 1, true) ~= nil
+                           and body:find('"output_tokens":5', 1, true) ~= nil
+
+            if stop_count ~= 1 then
+                ngx.say("FAIL: message_stop appeared " .. stop_count .. " 
times, expected 1")
+            elseif not has_usage then
+                ngx.say("FAIL: usage (input_tokens=10, output_tokens=5) not 
found in stream")
+            else
+                ngx.say("OK: usage-only chunk produced message_delta with 
usage")
+            end
+        }
+    }
+--- error_code: 200
+--- response_body
+OK: usage-only chunk produced message_delta with usage
+--- no_error_log
+[error]
+
+
+
+=== TEST 26: Anthropic SSE error event should be logged at warn level
+--- config
+    location /t {
+        content_by_lua_block {
+            local proto = 
require("apisix.plugins.ai-protocols.anthropic-messages")
+            local event = {
+                type = "error",
+                data = 
'{"type":"error","error":{"type":"overloaded_error","message":"Overloaded"}}'
+            }
+            local result = proto.parse_sse_event(event, {var = {}}, {})
+            ngx.say("type: " .. result.type)
+        }
+    }
+--- response_body
+type: done
+--- error_log
+Anthropic SSE error: type=overloaded_error, message=Overloaded


Reply via email to