This is an automated email from the ASF dual-hosted git repository.
nic443 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix.git
The following commit(s) were added to refs/heads/master by this push:
new fc5d763d5 feat(ai-proxy): add native Anthropic Messages API protocol
support (#13181)
fc5d763d5 is described below
commit fc5d763d5dba4b2365054137b58f8d385a4bf644
Author: Nic <[email protected]>
AuthorDate: Thu Apr 9 10:16:26 2026 +0800
feat(ai-proxy): add native Anthropic Messages API protocol support (#13181)
---
apisix/plugins/ai-protocols/anthropic-messages.lua | 350 +++++
.../anthropic-messages-to-openai-chat.lua | 447 +++++++
apisix/plugins/ai-protocols/converters/init.lua | 3 +
apisix/plugins/ai-protocols/init.lua | 6 +-
apisix/plugins/ai-protocols/openai-chat.lua | 11 +-
apisix/plugins/ai-protocols/openai-embeddings.lua | 4 -
apisix/plugins/ai-providers/anthropic.lua | 1 +
apisix/plugins/ai-providers/base.lua | 11 +
apisix/plugins/ai-proxy/base.lua | 11 +-
t/APISIX.pm | 22 +-
t/plugin/ai-proxy-protocol-conversion.t | 1384 ++++++++++++++++++++
11 files changed, 2227 insertions(+), 23 deletions(-)
diff --git a/apisix/plugins/ai-protocols/anthropic-messages.lua
b/apisix/plugins/ai-protocols/anthropic-messages.lua
new file mode 100644
index 000000000..8e5ccbe46
--- /dev/null
+++ b/apisix/plugins/ai-protocols/anthropic-messages.lua
@@ -0,0 +1,350 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements. See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+--- Anthropic Messages protocol adapter (client-side).
+-- Handles detection and response parsing for the Anthropic Messages API
+-- format. This adapter is for clients sending Anthropic-format requests.
+--
+-- Conversion logic (Anthropic↔OpenAI) lives in
+-- ai-protocols/converters/anthropic-messages-to-openai-chat.lua.
+
+local core = require("apisix.core")
+local string_sub = string.sub
+local uuid = require("resty.jit-uuid")
+local sse = require("apisix.plugins.ai-transport.sse")
+local table = table
+local type = type
+local ipairs = ipairs
+
+local _M = {}
+
+
+--- Detect whether the request matches the Anthropic Messages API format.
+-- Uses URI suffix (/v1/messages) and body (valid JSON table).
+function _M.matches(body, ctx)
+ local uri = ctx.var and ctx.var.uri
+ return uri and string_sub(uri, -12) == "/v1/messages" and type(body) ==
"table"
+end
+
+
+--- Check whether the request is a streaming request.
+function _M.is_streaming(body)
+ return body.stream == true
+end
+
+
+
+--- Parse a streaming SSE event in native Anthropic format.
+-- Used when the provider natively supports Anthropic protocol.
+function _M.parse_sse_event(event, ctx, state)
+ if event.type == "content_block_delta" then
+ local data, err = core.json.decode(event.data)
+ if not data then
+ core.log.warn("failed to decode SSE data: ", err)
+ return { type = "skip" }
+ end
+ if type(data.delta) == "table" and data.delta.type == "text_delta"
+ and type(data.delta.text) == "string" then
+ return {
+ type = "delta",
+ texts = { data.delta.text },
+ }
+ end
+ return { type = "skip" }
+
+ elseif event.type == "message_delta" then
+ local data, err = core.json.decode(event.data)
+ if not data then
+ core.log.warn("failed to decode message_delta: ", err)
+ return { type = "skip" }
+ end
+ if type(data.usage) == "table" then
+ return {
+ type = "usage",
+ usage = {
+ prompt_tokens = data.usage.input_tokens or 0,
+ completion_tokens = data.usage.output_tokens or 0,
+ total_tokens = (data.usage.input_tokens or 0)
+ + (data.usage.output_tokens or 0),
+ },
+ raw_usage = data.usage,
+ }
+ end
+ return { type = "skip" }
+
+ elseif event.type == "message_stop" then
+ return { type = "done" }
+
+ elseif event.type == "message_start" then
+ local data = core.json.decode(event.data)
+ if not data then
+ return { type = "skip" }
+ end
+ if type(data.message) == "table" and type(data.message.usage) ==
"table" then
+ local usage = data.message.usage
+ return {
+ type = "usage",
+ usage = {
+ prompt_tokens = usage.input_tokens or 0,
+ completion_tokens = usage.output_tokens or 0,
+ total_tokens = (usage.input_tokens or 0) +
(usage.output_tokens or 0),
+ },
+ raw_usage = usage,
+ }
+ end
+ return { type = "skip" }
+
+ elseif event.type == "error" then
+ local err_data = core.json.decode(event.data)
+ local err_type = err_data and err_data.error and err_data.error.type
or "unknown"
+ local err_msg = err_data and err_data.error and err_data.error.message
or "unknown"
+ core.log.warn("Anthropic SSE error: type=", err_type, ", message=",
err_msg)
+ return { type = "done" }
+ end
+
+ return { type = "skip" }
+end
+
+
+--- Extract response text from a native Anthropic response body.
+function _M.extract_response_text(res_body)
+ if type(res_body) ~= "table" then
+ return nil
+ end
+ if type(res_body.content) == "table" then
+ local contents = {}
+ for _, block in ipairs(res_body.content) do
+ if type(block) == "table" and block.type == "text"
+ and type(block.text) == "string" then
+ core.table.insert(contents, block.text)
+ end
+ end
+ if #contents > 0 then
+ return table.concat(contents, " ")
+ end
+ end
+ return nil
+end
+
+
+--- Build a non-streaming request from system prompt and user content.
+function _M.build_simple_request(system_prompt, user_content, opts)
+ local body = {
+ messages = {{role = "user", content = user_content}},
+ stream = false,
+ max_tokens = (opts and opts.max_tokens) or 4096,
+ }
+ if system_prompt then
+ body.system = system_prompt
+ end
+ if opts and opts.model then
+ body.model = opts.model
+ end
+ return body
+end
+
+
+function _M.extract_usage(res_body)
+ if type(res_body) ~= "table" or type(res_body.usage) ~= "table" then
+ return nil, nil
+ end
+ local raw = res_body.usage
+ local prompt = raw.input_tokens or 0
+ local completion = raw.output_tokens or 0
+ return {
+ prompt_tokens = prompt,
+ completion_tokens = completion,
+ total_tokens = prompt + completion,
+ }, raw
+end
+
+
+--- Extract all text content from a request body for moderation.
+function _M.extract_request_content(body)
+ local contents = {}
+ if type(body.messages) == "table" then
+ for _, message in ipairs(body.messages) do
+ if type(message) ~= "table" then
+ goto CONTINUE_MESSAGE
+ end
+ if type(message.content) == "string" then
+ core.table.insert(contents, message.content)
+ elseif type(message.content) == "table" then
+ for _, block in ipairs(message.content) do
+ if type(block) == "table" and block.type == "text"
+ and type(block.text) == "string" then
+ core.table.insert(contents, block.text)
+ end
+ end
+ end
+ ::CONTINUE_MESSAGE::
+ end
+ end
+ return contents
+end
+
+
+--- Get messages in canonical {role, content} format.
+-- Anthropic content blocks are flattened to plain text.
+function _M.get_messages(body)
+ local messages = {}
+ if type(body.system) == "string" then
+ core.table.insert(messages, {role = "system", content = body.system})
+ end
+ if type(body.messages) == "table" then
+ for _, message in ipairs(body.messages) do
+ local content = message.content
+ if type(content) == "string" then
+ core.table.insert(messages, {role = message.role, content =
content})
+ elseif type(content) == "table" then
+ local texts = {}
+ for _, block in ipairs(content) do
+ if type(block) == "table" and block.type == "text" then
+ core.table.insert(texts, block.text)
+ end
+ end
+ if #texts > 0 then
+ core.table.insert(messages, {
+ role = message.role,
+ content = table.concat(texts, " "),
+ })
+ end
+ end
+ end
+ end
+ return messages
+end
+
+
+--- Prepend messages to the request body.
+function _M.prepend_messages(body, msgs)
+ if not msgs or #msgs == 0 then return end
+ if not body.messages then
+ body.messages = {}
+ end
+ local new_messages = {}
+ for _, msg in ipairs(msgs) do
+ core.table.insert(new_messages, {role = msg.role, content =
msg.content})
+ end
+ for _, msg in ipairs(body.messages) do
+ core.table.insert(new_messages, msg)
+ end
+ body.messages = new_messages
+end
+
+
+--- Append messages to the request body.
+function _M.append_messages(body, msgs)
+ if not msgs or #msgs == 0 then return end
+ if not body.messages then
+ body.messages = {}
+ end
+ for _, msg in ipairs(msgs) do
+ core.table.insert(body.messages, {role = msg.role, content =
msg.content})
+ end
+end
+
+
+--- Get raw request content for logging.
+function _M.get_request_content(body)
+ return body.messages
+end
+-- opts: {text, model, usage, stream}
+function _M.build_deny_response(opts)
+ if opts.stream then
+ local message_start = {
+ type = "message_start",
+ message = {
+ id = uuid.generate_v4(),
+ type = "message",
+ role = "assistant",
+ model = opts.model,
+ content = {},
+ usage = opts.usage,
+ },
+ }
+ local content_block_start = {
+ type = "content_block_start",
+ index = 0,
+ content_block = { type = "text", text = "" },
+ }
+ local content_block_delta = {
+ type = "content_block_delta",
+ index = 0,
+ delta = { type = "text_delta", text = opts.text },
+ }
+ local content_block_stop = {
+ type = "content_block_stop",
+ index = 0,
+ }
+ local message_delta = {
+ type = "message_delta",
+ delta = { stop_reason = "end_turn" },
+ usage = opts.usage,
+ }
+ return sse.encode({ type = "message_start", data =
core.json.encode(message_start) })
+ .. sse.encode({ type = "content_block_start",
+ data = core.json.encode(content_block_start) })
+ .. sse.encode({ type = "content_block_delta",
+ data = core.json.encode(content_block_delta) })
+ .. sse.encode({ type = "content_block_stop",
+ data = core.json.encode(content_block_stop) })
+ .. sse.encode({ type = "message_delta",
+ data = core.json.encode(message_delta) })
+ .. sse.encode({ type = "message_stop", data = "{}" })
+ else
+ return core.json.encode({
+ id = uuid.generate_v4(),
+ type = "message",
+ role = "assistant",
+ model = opts.model,
+ content = {{
+ type = "text",
+ text = opts.text,
+ }},
+ stop_reason = "end_turn",
+ usage = opts.usage,
+ })
+ end
+end
+
+
+--- Build an empty usage object.
+function _M.empty_usage()
+ return { input_tokens = 0, output_tokens = 0 }
+end
+
+
+--- Check if an SSE event is a data event (contains parseable content).
+function _M.is_data_event(event)
+ return event.type == "content_block_delta" or event.type == "message_delta"
+end
+
+
+--- Check if an SSE event is the terminal/done event.
+function _M.is_done_event(event)
+ return event.type == "message_stop"
+end
+
+
+--- Build a terminal SSE event string.
+function _M.build_done_event()
+ return sse.encode({ type = "message_stop", data = "{}" })
+end
+
+
+return _M
diff --git
a/apisix/plugins/ai-protocols/converters/anthropic-messages-to-openai-chat.lua
b/apisix/plugins/ai-protocols/converters/anthropic-messages-to-openai-chat.lua
new file mode 100644
index 000000000..518aba8d4
--- /dev/null
+++
b/apisix/plugins/ai-protocols/converters/anthropic-messages-to-openai-chat.lua
@@ -0,0 +1,447 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements. See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+--- Converter: Anthropic Messages → OpenAI Chat Completions.
+-- Converts client requests from Anthropic Messages API format to
+-- OpenAI Chat Completions format, and converts provider responses
+-- back from OpenAI to Anthropic format.
+--
+-- Converters work DOWNSTREAM of adapters: the target adapter (openai-chat)
+-- parses the provider's response, and this converter transforms the parsed
+-- result into the client's format (Anthropic Messages).
+
+local core = require("apisix.core")
+local table = table
+local type = type
+local ipairs = ipairs
+local tostring = tostring
+local setmetatable = setmetatable
+
+local _M = {
+ from = "anthropic-messages",
+ to = "openai-chat",
+}
+
+
+-- SSE event helpers
+local function make_sse_event(event_type, data)
+ return { type = event_type, data = core.json.encode(data) }
+end
+
+local function push_content_block_stop(events, idx)
+ table.insert(events, make_sse_event("content_block_stop", {
+ type = "content_block_stop",
+ index = idx,
+ }))
+end
+
+local function push_content_block_start(events, idx, block)
+ table.insert(events, make_sse_event("content_block_start", {
+ type = "content_block_start",
+ index = idx,
+ content_block = block,
+ }))
+end
+
+local function push_content_block_delta(events, idx, delta)
+ table.insert(events, make_sse_event("content_block_delta", {
+ type = "content_block_delta",
+ index = idx,
+ delta = delta,
+ }))
+end
+
+local openai_stop_reason_map = {
+ stop = "end_turn",
+ length = "max_tokens",
+ content_filter = "end_turn",
+ tool_calls = "tool_use",
+}
+
+
+--- Convert an incoming Anthropic request to OpenAI Chat format.
+function _M.convert_request(request_table, ctx)
+ if type(request_table) ~= "table" then
+ return nil, "request body must be a table"
+ end
+
+ if type(request_table.messages) ~= "table" or
+ #request_table.messages == 0 then
+ return nil, "missing messages"
+ end
+
+ local openai_body = core.table.clone(request_table)
+
+ -- 1. Handle System Prompt
+ local messages = {}
+ if request_table.system then
+ local system_content = ""
+ if type(request_table.system) == "string" then
+ system_content = request_table.system
+ elseif type(request_table.system) == "table" then
+ for _, block in ipairs(request_table.system) do
+ if type(block) == "table" and block.type == "text"
+ and type(block.text) == "string" then
+ system_content = system_content .. block.text
+ end
+ end
+ end
+
+ if system_content ~= "" then
+ table.insert(messages, {
+ role = "system",
+ content = system_content
+ })
+ end
+ openai_body.system = nil
+ end
+
+ -- 2. Convert Messages (including tool calls and results)
+ for i, msg in ipairs(request_table.messages) do
+ if type(msg) ~= "table" or type(msg.role) ~= "string" then
+ return nil, "invalid message at index " .. i
+ end
+ if type(msg.content) ~= "string" and type(msg.content) ~= "table" then
+ return nil, "invalid message content at index " .. i
+ end
+
+ local new_msg = {
+ role = msg.role,
+ content = ""
+ }
+ if type(msg.content) == "string" then
+ new_msg.content = msg.content
+ elseif type(msg.content) == "table" then
+ local tool_calls = {}
+ local tool_results = {}
+
+ for _, block in ipairs(msg.content) do
+ if type(block) ~= "table" then
+ core.log.warn("unexpected non-table content block in
Anthropic ",
+ "request, skipping: ", tostring(block))
+ goto CONTINUE_BLOCK
+ end
+
+ if block.type == "text" and type(block.text) == "string" then
+ new_msg.content = (new_msg.content or "") .. block.text
+ elseif block.type == "tool_use" then
+ if type(block.id) == "string" and type(block.name) ==
"string" then
+ table.insert(tool_calls, {
+ id = block.id,
+ type = "function",
+ ["function"] = {
+ name = block.name,
+ arguments = core.json.encode(block.input or {})
+ }
+ })
+ end
+ elseif block.type == "tool_result" then
+ if type(block.tool_use_id) == "string" then
+ table.insert(tool_results, {
+ role = "tool",
+ tool_call_id = block.tool_use_id,
+ content = type(block.content) == "table"
+ and core.json.encode(block.content)
+ or tostring(block.content or "")
+ })
+ end
+ end
+
+ ::CONTINUE_BLOCK::
+ end
+
+ if #tool_calls > 0 then
+ new_msg.tool_calls = tool_calls
+ new_msg.content = new_msg.content ~= "" and new_msg.content or
nil
+ end
+
+ if #tool_results > 0 then
+ if new_msg.content and new_msg.content ~= "" then
+ table.insert(messages, { role = msg.role, content =
new_msg.content })
+ end
+ for _, tr in ipairs(tool_results) do
+ table.insert(messages, tr)
+ end
+ goto CONTINUE
+ end
+ end
+
+ table.insert(messages, new_msg)
+ ::CONTINUE::
+ end
+ openai_body.messages = messages
+
+ -- 3. Convert Tools Definition
+ if type(request_table.tools) == "table" then
+ local openai_tools = {}
+ for i, tool in ipairs(request_table.tools) do
+ if type(tool) ~= "table" or type(tool.name) ~= "string" or
tool.name == "" then
+ return nil, "invalid tool definition at index " .. i
+ end
+ table.insert(openai_tools, {
+ type = "function",
+ ["function"] = {
+ name = tool.name,
+ description = tool.description,
+ parameters = tool.input_schema
+ }
+ })
+ end
+ openai_body.tools = openai_tools
+ end
+
+ -- 4. Map Parameters
+ if openai_body.max_tokens then
+ openai_body.max_completion_tokens = openai_body.max_tokens
+ end
+
+ if openai_body.stop_sequences then
+ openai_body.stop = openai_body.stop_sequences
+ openai_body.stop_sequences = nil
+ end
+
+ return openai_body
+end
+
+
+--- Convert an OpenAI response back to Anthropic format.
+function _M.convert_response(res_body, ctx)
+ if type(res_body) ~= "table" then
+ return nil, "response body must be a table"
+ end
+
+ local choice = res_body.choices and res_body.choices[1]
+ if not choice then
+ return nil, "no choices in response"
+ end
+
+ local model = ctx.var.llm_model
+
+ local content = {}
+ local text = choice.message and choice.message.content
+ if type(text) == "string" and text ~= "" then
+ table.insert(content, { type = "text", text = text })
+ end
+
+ if choice.message and type(choice.message.tool_calls) == "table" then
+ for _, tc in ipairs(choice.message.tool_calls) do
+ local input = {}
+ if tc["function"] and type(tc["function"].arguments) == "string"
then
+ local decoded, err = core.json.decode(tc["function"].arguments)
+ if decoded == nil then
+ return nil, "invalid tool_call arguments: " .. (err or
"decode error")
+ end
+ input = decoded
+ end
+ table.insert(content, {
+ type = "tool_use",
+ id = tc.id or "",
+ name = (tc["function"] and tc["function"].name) or "",
+ input = input
+ })
+ end
+ end
+
+ if #content == 0 then
+ content = {{ type = "text", text = "" }}
+ end
+
+ local anthropic_res = {
+ id = res_body.id,
+ type = "message",
+ role = "assistant",
+ model = model or res_body.model,
+ content = content,
+ stop_reason = openai_stop_reason_map[choice.finish_reason] or
"end_turn",
+ usage = {
+ input_tokens = res_body.usage and res_body.usage.prompt_tokens or
0,
+ output_tokens = res_body.usage and
res_body.usage.completion_tokens or 0,
+ }
+ }
+
+ if res_body.usage and res_body.usage.prompt_tokens_details then
+ anthropic_res.usage.cache_read_input_tokens =
+ res_body.usage.prompt_tokens_details.cached_tokens or 0
+ end
+
+ return anthropic_res
+end
+
+
+--- Convert an OpenAI SSE chunk to Anthropic SSE events.
+-- state: table to maintain stream state (is_first, content_index, etc.)
+local function openai_to_anthropic_sse(openai_chunk, state)
+ if type(openai_chunk) ~= "table" then
+ return {}
+ end
+ if type(state) ~= "table" then
+ return {}
+ end
+ local events = {}
+ local choice = openai_chunk.choices and openai_chunk.choices[1]
+
+ -- If finish_reason was seen, we deferred message_delta+message_stop to
allow
+ -- a trailing usage-only chunk to be merged in. Flush now.
+ if state.is_done then
+ if state.pending_stop then
+ local message_delta = state.pending_message_delta
+ if type(openai_chunk.usage) == "table" and not message_delta.usage
then
+ message_delta.usage = {
+ input_tokens = openai_chunk.usage.prompt_tokens or 0,
+ output_tokens = openai_chunk.usage.completion_tokens or 0,
+ }
+ end
+ table.insert(events, make_sse_event("message_delta",
message_delta))
+ table.insert(events, make_sse_event("message_stop", { type =
"message_stop" }))
+ state.pending_stop = false
+ state.pending_message_delta = nil
+ end
+ return events
+ end
+
+ -- 1. Initialize message on first chunk
+ if state.is_first then
+ local message = {
+ id = openai_chunk.id,
+ type = "message",
+ role = "assistant",
+ model = openai_chunk.model,
+ content = {},
+ usage = { input_tokens = 0, output_tokens = 0 }
+ }
+ setmetatable(message.content, core.json.empty_array_mt)
+
+ table.insert(events, make_sse_event("message_start", {
+ type = "message_start",
+ message = message,
+ }))
+ push_content_block_start(events, 0, { type = "text", text = "" })
+
+ state.is_first = false
+ state.content_index = 0
+ state.current_open_block = 0
+ state.tool_call_indices = {}
+ end
+
+ -- 2. Handle text content delta
+ if choice and choice.delta and type(choice.delta.content) == "string"
+ and choice.delta.content ~= "" then
+ push_content_block_delta(events, 0, {
+ type = "text_delta",
+ text = choice.delta.content,
+ })
+ end
+
+ -- 3. Handle tool_calls deltas
+ if choice and choice.delta and type(choice.delta.tool_calls) == "table"
then
+ for _, tc_delta in ipairs(choice.delta.tool_calls) do
+ if type(tc_delta) ~= "table" then
+ goto continue_tc
+ end
+ local tc_idx = tc_delta.index
+ if tc_idx == nil then
+ goto continue_tc
+ end
+
+ if not state.tool_call_indices[tc_idx] then
+ if state.current_open_block ~= nil then
+ push_content_block_stop(events, state.current_open_block)
+ end
+ state.content_index = state.content_index + 1
+ state.tool_call_indices[tc_idx] = state.content_index
+ state.current_open_block = state.content_index
+
+ local fn = tc_delta["function"] or {}
+ push_content_block_start(events, state.content_index, {
+ type = "tool_use",
+ id = tc_delta.id or "",
+ name = fn.name or "",
+ input = {},
+ })
+ end
+
+ local fn = tc_delta["function"]
+ local args = fn and fn.arguments
+ if type(args) == "string" and args ~= "" then
+ push_content_block_delta(events,
state.tool_call_indices[tc_idx], {
+ type = "input_json_delta",
+ partial_json = args,
+ })
+ end
+
+ ::continue_tc::
+ end
+ end
+
+ -- 4. Handle stream completion
+ if choice and type(choice.finish_reason) == "string" then
+ if state.current_open_block ~= nil then
+ push_content_block_stop(events, state.current_open_block)
+ state.current_open_block = nil
+ end
+
+ local message_delta = {
+ type = "message_delta",
+ delta = {
+ stop_reason = openai_stop_reason_map[choice.finish_reason] or
"end_turn",
+ },
+ }
+
+ if type(openai_chunk.usage) == "table" then
+ message_delta.usage = {
+ input_tokens = openai_chunk.usage.prompt_tokens or 0,
+ output_tokens = openai_chunk.usage.completion_tokens or 0,
+ }
+ end
+
+ state.pending_message_delta = message_delta
+ state.pending_stop = true
+ state.is_done = true
+ end
+
+ return events
+end
+
+
+--- Convert parsed SSE events (from openai-chat adapter) to Anthropic format.
+-- Called with the result of openai_chat_adapter.parse_sse_event().
+-- @param parsed table Parsed SSE event from target adapter
+-- @param ctx table Request context
+-- @param state table Mutable converter state
+-- @return table|nil List of Anthropic SSE events to send to client
+function _M.convert_sse_events(parsed, _, state)
+ if not parsed or parsed.type == "skip" then
+ return nil
+ end
+
+ if parsed.type == "done" then
+ -- Flush any deferred message_stop
+ if state.pending_stop then
+ return openai_to_anthropic_sse({ choices = {} }, state)
+ end
+ return nil
+ end
+
+ if parsed.data then
+ return openai_to_anthropic_sse(parsed.data, state)
+ end
+
+ return nil
+end
+
+
+return _M
diff --git a/apisix/plugins/ai-protocols/converters/init.lua
b/apisix/plugins/ai-protocols/converters/init.lua
index f0dba21b0..cbe0e2261 100644
--- a/apisix/plugins/ai-protocols/converters/init.lua
+++ b/apisix/plugins/ai-protocols/converters/init.lua
@@ -62,6 +62,9 @@ end
-- To add a new converter, create a module file and add one line here.
---------------------------------------------------------------------
+register(require(
+
"apisix.plugins.ai-protocols.converters.anthropic-messages-to-openai-chat"))
+
register(require(
"apisix.plugins.ai-protocols.converters.openai-embeddings-to-vertex-predict"))
diff --git a/apisix/plugins/ai-protocols/init.lua
b/apisix/plugins/ai-protocols/init.lua
index c290292c5..583c7e4aa 100644
--- a/apisix/plugins/ai-protocols/init.lua
+++ b/apisix/plugins/ai-protocols/init.lua
@@ -29,10 +29,12 @@ local _M = {}
local registered = {
["openai-chat"] = require("apisix.plugins.ai-protocols.openai-chat"),
["openai-embeddings"] =
require("apisix.plugins.ai-protocols.openai-embeddings"),
+ ["anthropic-messages"] =
require("apisix.plugins.ai-protocols.anthropic-messages"),
}
--- Detection order: body-only (chat, embeddings).
+-- Detection order: URL+body first (anthropic), then body-only (chat,
embeddings).
local detection_order = {
+ { name = "anthropic-messages", protocol = registered["anthropic-messages"]
},
{ name = "openai-chat", protocol = registered["openai-chat"] },
{ name = "openai-embeddings", protocol = registered["openai-embeddings"] },
}
@@ -41,7 +43,7 @@ local detection_order = {
--- Detect the client protocol by asking each protocol if it matches.
-- @param body table The parsed request body
-- @param ctx table The request context
--- @return string Protocol name: "openai-chat" | "openai-embeddings"
+-- @return string Protocol name: "openai-chat" | "openai-embeddings" |
"anthropic-messages"
function _M.detect(body, ctx)
for _, entry in ipairs(detection_order) do
if entry.protocol.matches(body, ctx) then
diff --git a/apisix/plugins/ai-protocols/openai-chat.lua
b/apisix/plugins/ai-protocols/openai-chat.lua
index 22e1ee881..1875b4908 100644
--- a/apisix/plugins/ai-protocols/openai-chat.lua
+++ b/apisix/plugins/ai-protocols/openai-chat.lua
@@ -41,15 +41,14 @@ function _M.is_streaming(body)
end
---- Prepare the request body for sending.
--- Injects stream_options if streaming, extracts model info.
--- @return table body (possibly mutated)
--- @return string|nil model from request body
-function _M.prepare_request(body, ctx, opts)
+--- Prepare the outgoing request body for the target provider.
+-- Injects stream_options so the provider includes usage in streaming
responses.
+-- Called after protocol conversion in build_request(), covering both
passthrough
+-- and convert scenarios.
+function _M.prepare_outgoing_request(body)
if body.stream then
body.stream_options = { include_usage = true }
end
- return body, body.model
end
diff --git a/apisix/plugins/ai-protocols/openai-embeddings.lua
b/apisix/plugins/ai-protocols/openai-embeddings.lua
index b2da98ef9..51feb24a5 100644
--- a/apisix/plugins/ai-protocols/openai-embeddings.lua
+++ b/apisix/plugins/ai-protocols/openai-embeddings.lua
@@ -36,10 +36,6 @@ function _M.is_streaming(_)
end
-function _M.prepare_request(body, _, _)
- return body, body and body.model
-end
-
function _M.extract_usage(res_body)
if not res_body or not res_body.usage then
diff --git a/apisix/plugins/ai-providers/anthropic.lua
b/apisix/plugins/ai-providers/anthropic.lua
index 6c4afafdb..2e79e5da8 100644
--- a/apisix/plugins/ai-providers/anthropic.lua
+++ b/apisix/plugins/ai-providers/anthropic.lua
@@ -21,6 +21,7 @@ return require("apisix.plugins.ai-providers.base").new(
port = 443,
capabilities = {
["openai-chat"] = { path = "/v1/chat/completions" },
+ ["anthropic-messages"] = { path = "/v1/messages" },
},
}
)
diff --git a/apisix/plugins/ai-providers/base.lua
b/apisix/plugins/ai-providers/base.lua
index b5f79a493..4a69f9eaf 100644
--- a/apisix/plugins/ai-providers/base.lua
+++ b/apisix/plugins/ai-providers/base.lua
@@ -35,6 +35,7 @@ local sse = require("apisix.plugins.ai-transport.sse")
local transport_http = require("apisix.plugins.ai-transport.http")
local transport_auth = require("apisix.plugins.ai-transport.auth")
local log_sanitize = require("apisix.utils.log-sanitize")
+local protocols = require("apisix.plugins.ai-protocols")
local ngx = ngx
local ngx_now = ngx.now
@@ -93,6 +94,16 @@ function _M.build_request(self, ctx, conf, request_body,
opts)
request_body = converted
end
+ -- Inject target-protocol-specific parameters (e.g. stream_options for
OpenAI).
+ -- This runs after conversion so it covers both passthrough and convert
scenarios.
+ local target_protocol = ctx.ai_target_protocol
+ if target_protocol then
+ local target_proto = protocols.get(target_protocol)
+ if target_proto and target_proto.prepare_outgoing_request then
+ target_proto.prepare_outgoing_request(request_body)
+ end
+ end
+
core.log.info("request extra_opts to LLM server: ",
core.json.delay_encode(log_sanitize.redact_extra_opts(opts),
true))
diff --git a/apisix/plugins/ai-proxy/base.lua b/apisix/plugins/ai-proxy/base.lua
index 0960366eb..3cf054e45 100644
--- a/apisix/plugins/ai-proxy/base.lua
+++ b/apisix/plugins/ai-proxy/base.lua
@@ -100,8 +100,8 @@ end
-- Execute the AI proxy pipeline:
-- 1. Validate request
-- 2. Route client protocol to driver capability (passthrough / convert /
error)
--- 3. Prepare request via protocol (stream_options, model extraction)
--- 4. Build HTTP request (protocol conversion, auth, headers)
+-- 3. Extract model from request body
+-- 4. Build HTTP request (protocol conversion, target protocol params, auth,
headers)
-- 5. Send via transport
-- 6. Parse response (streaming or non-streaming)
-- 7. Set keepalive
@@ -156,11 +156,10 @@ function _M.before_proxy(conf, ctx, on_error)
target_proto = target_protocol
end
ctx.ai_converter = converter
+ ctx.ai_target_protocol = target_proto
- -- Step 2: Prepare request via protocol
- local request_model
- request_body, request_model = client_proto.prepare_request(
- request_body, ctx, extra_opts)
+ -- Step 2: Extract model from request
+ local request_model = request_body.model
if request_model then
ctx.var.request_llm_model = request_model
diff --git a/t/APISIX.pm b/t/APISIX.pm
index ac0edef49..78e676c9d 100644
--- a/t/APISIX.pm
+++ b/t/APISIX.pm
@@ -777,6 +777,22 @@ _EOC_
$ipv6_listen_conf = "listen \[::1\]:1984;"
}
+ my $enable_test_control_api_v1 =
+ !defined($ENV{TEST_ENABLE_CONTROL_API_V1}) ||
+ $ENV{TEST_ENABLE_CONTROL_API_V1} ne "0";
+
+ my $control_api_v1_location = "";
+ if ($enable_test_control_api_v1) {
+ $control_api_v1_location = <<_EOC_;
+ location /v1/ {
+ content_by_lua_block {
+ apisix.http_control()
+ }
+ }
+
+_EOC_
+ }
+
my $config = $block->config // '';
$config .= <<_EOC_;
$ipv6_listen_conf
@@ -825,11 +841,7 @@ _EOC_
}
}
- location /v1/ {
- content_by_lua_block {
- apisix.http_control()
- }
- }
+ $control_api_v1_location
location / {
set \$upstream_mirror_host '';
diff --git a/t/plugin/ai-proxy-protocol-conversion.t
b/t/plugin/ai-proxy-protocol-conversion.t
new file mode 100644
index 000000000..a6ee0f2e2
--- /dev/null
+++ b/t/plugin/ai-proxy-protocol-conversion.t
@@ -0,0 +1,1384 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+BEGIN {
+ $ENV{TEST_ENABLE_CONTROL_API_V1} = "0";
+}
+
+use t::APISIX 'no_plan';
+
+log_level("info");
+repeat_each(1);
+no_long_string();
+no_root_location();
+
+
+my $resp_file = 't/assets/ai-proxy-response.json';
+open(my $fh, '<', $resp_file) or die "Could not open file '$resp_file' $!";
+my $resp = do { local $/; <$fh> };
+close($fh);
+
+
+add_block_preprocessor(sub {
+ my ($block) = @_;
+
+ if (!defined $block->request) {
+ $block->set_value("request", "GET /t");
+ }
+
+ my $http_config = $block->http_config // <<_EOC_;
+ server {
+ server_name openai;
+ listen 6724;
+
+ default_type 'application/json';
+
+ location /v1/chat/completions {
+ content_by_lua_block {
+ local json = require("cjson.safe")
+
+ if ngx.req.get_method() ~= "POST" then
+ ngx.status = 400
+ ngx.say("Unsupported request method: ",
ngx.req.get_method())
+ end
+ ngx.req.read_body()
+ local body, err = ngx.req.get_body_data()
+ body, err = json.decode(body)
+
+ local test_type = ngx.req.get_headers()["test-type"]
+ if test_type == "options" then
+ if body.foo == "bar" then
+ ngx.status = 200
+ ngx.say("options works")
+ else
+ ngx.status = 500
+ ngx.say("model options feature doesn't work")
+ end
+ return
+ end
+
+ if test_type == "tools" then
+ -- Verify request was converted from Anthropic to
OpenAI tools format
+ local tool = body and body.tools and body.tools[1]
+ if not tool or tool.type ~= "function" or
+ not tool["function"] or tool["function"].name ~=
"get_weather" then
+ ngx.status = 400
+ ngx.say([[{"error": "tool not converted to openai
format"}]])
+ return
+ end
+ ngx.status = 200
+ ngx.say([[{
+ "id": "chatcmpl-tool",
+ "object": "chat.completion",
+ "model": "gpt-4o",
+ "choices": [{
+ "index": 0,
+ "message": {
+ "role": "assistant",
+ "content": null,
+ "tool_calls": [{
+ "id": "call_abc123",
+ "type": "function",
+ "function": {
+ "name": "get_weather",
+ "arguments": "{\\\"location\\\":
\\\"Paris\\\"}"
+ }
+ }]
+ },
+ "finish_reason": "tool_calls"
+ }],
+ "usage": {"prompt_tokens": 20,
"completion_tokens": 10, "total_tokens": 30}
+ }]])
+ return
+ end
+
+ local header_auth = ngx.req.get_headers()["authorization"]
+ local query_auth = ngx.req.get_uri_args()["apikey"]
+
+ if header_auth ~= "Bearer token" and query_auth ~=
"apikey" then
+ ngx.status = 401
+ ngx.say("Unauthorized")
+ return
+ end
+
+ if header_auth == "Bearer token" or query_auth == "apikey"
then
+ ngx.req.read_body()
+ local body, err = ngx.req.get_body_data()
+ body, err = json.decode(body)
+
+ if not body.messages or #body.messages < 1 then
+ ngx.status = 400
+ ngx.say([[{ "error": "bad request"}]])
+ return
+ end
+ if body.messages[1].content == "write an SQL query to
get all rows from student table" then
+ ngx.print("SELECT * FROM STUDENTS")
+ return
+ end
+
+ ngx.status = 200
+ ngx.say([[$resp]])
+ return
+ end
+
+
+ ngx.status = 503
+ ngx.say("reached the end of the test suite")
+ }
+ }
+
+ location /random {
+ content_by_lua_block {
+ ngx.say("path override works")
+ }
+ }
+ }
+_EOC_
+
+ $block->set_value("http_config", $http_config);
+});
+
+run_tests();
+
+__DATA__
+
+=== TEST 1: Set up route – Anthropic protocol auto-detected via exact
/v1/messages URI (Non-stream)
+--- config
+ location /t {
+ content_by_lua_block {
+ local t = require("lib.test_admin").test
+ local code, body = t('/apisix/admin/routes/1',
+ ngx.HTTP_PUT,
+ [[{
+ "uri": "/v1/messages",
+ "plugins": {
+ "ai-proxy": {
+ "provider": "openai",
+ "auth": {
+ "header": {
+ "Authorization": "Bearer token"
+ }
+ },
+ "override": {
+ "endpoint":
"http://localhost:6724/v1/chat/completions"
+ }
+ }
+ }
+ }]]
+ )
+
+ if code >= 300 then
+ ngx.status = code
+ end
+ ngx.say(body)
+ }
+ }
+--- response_body
+passed
+
+
+
+=== TEST 2: Send Anthropic request to /v1/messages and verify protocol
conversion
+--- request
+POST /v1/messages
+{ "model": "claude-3-5-sonnet-20241022", "messages": [ { "role":
"user","content": "hello" } ] }
+--- more_headers
+Authorization: Bearer token
+--- error_code: 200
+--- response_body eval
+qr/"text":"1 \+ 1 = 2\."/
+
+
+
+=== TEST 3: Missing messages field returns 400
+--- request
+POST /v1/messages
+{ "model": "claude-3-5-sonnet-20241022" }
+--- more_headers
+Authorization: Bearer token
+--- error_code: 400
+--- response_body
+{"error_msg":"missing messages"}
+
+
+
+=== TEST 4: Malformed JSON body returns 400
+--- request
+POST /v1/messages
+this is not valid json
+--- more_headers
+Authorization: Bearer token
+Content-Type: application/json
+--- error_code: 400
+
+
+
+=== TEST 5: messages field is wrong type (non-array)
+--- request
+POST /v1/messages
+{ "model": "claude-3-5-sonnet-20241022", "messages": "hello" }
+--- more_headers
+Authorization: Bearer token
+--- error_code: 400
+--- response_body
+{"error_msg":"missing messages"}
+
+
+
+=== TEST 6: messages is an empty array
+--- request
+POST /v1/messages
+{ "model": "claude-3-5-sonnet-20241022", "messages": [] }
+--- more_headers
+Authorization: Bearer token
+--- error_code: 400
+--- response_body
+{"error_msg":"missing messages"}
+
+
+
+=== TEST 7: Set up route for stream test – exact URI /v1/messages triggers
Anthropic detection
+--- config
+ location /t {
+ content_by_lua_block {
+ local t = require("lib.test_admin").test
+ local code, body = t('/apisix/admin/routes/1',
+ ngx.HTTP_PUT,
+ [[{
+ "uri": "/v1/messages",
+ "plugins": {
+ "ai-proxy": {
+ "provider": "openai",
+ "auth": {
+ "header": {
+ "Authorization": "Bearer token"
+ }
+ },
+ "options": {
+ "model": "claude-3-5-sonnet-20241022",
+ "stream": true
+ },
+ "override": {
+ "endpoint":
"http://localhost:6724/v1/chat/completions"
+ }
+ }
+ }
+ }]]
+ )
+
+ if code >= 300 then
+ ngx.status = code
+ end
+ ngx.say(body)
+ }
+ }
+--- response_body
+passed
+
+
+
+=== TEST 8: Send Anthropic stream request and verify SSE conversion
+--- http_config
+ server {
+ server_name openai;
+ listen 6724;
+
+ default_type 'application/json';
+
+ location /v1/chat/completions {
+ content_by_lua_block {
+ local json = require("toolkit.json")
+ ngx.req.read_body()
+ local body = json.decode(ngx.req.get_body_data())
+
+ if not body.stream then
+ ngx.status = 400
+ ngx.say("Expected stream=true")
+ return
+ end
+
+ ngx.header["Content-Type"] = "text/event-stream"
+ ngx.say("data: " .. json.encode({
+ id = "chatcmpl-123",
+ object = "chat.completion.chunk",
+ model = "gpt-4o",
+ choices = {{ index = 0, delta = { role = "assistant"
}, finish_reason = nil }}
+ }) .. "\n")
+ ngx.flush(true)
+
+ ngx.say("data: " .. json.encode({
+ id = "chatcmpl-123",
+ object = "chat.completion.chunk",
+ model = "gpt-4o",
+ choices = {{ index = 0, delta = { content = "Hello" },
finish_reason = nil }}
+ }) .. "\n")
+ ngx.flush(true)
+
+ ngx.say("data: " .. json.encode({
+ id = "chatcmpl-123",
+ object = "chat.completion.chunk",
+ model = "gpt-4o",
+ choices = {{ index = 0, delta = { content = " world"
}, finish_reason = "stop" }},
+ usage = { prompt_tokens = 5, completion_tokens = 5,
total_tokens = 10 }
+ }) .. "\n")
+ ngx.flush(true)
+
+ ngx.say("data: [DONE]\n")
+ }
+ }
+ }
+--- config
+ location /t {
+ content_by_lua_block {
+ local http = require("resty.http")
+ local httpc = http.new()
+ local core = require("apisix.core")
+
+ local ok, err = httpc:connect({
+ scheme = "http",
+ host = "localhost",
+ port = ngx.var.server_port,
+ })
+
+ local res, err = httpc:request({
+ method = "POST",
+ path = "/v1/messages",
+ headers = { ["Content-Type"] = "application/json",
["Connection"] = "close" },
+ body = [[{
+ "model": "claude-3-5-sonnet-20241022",
+ "messages": [{"role": "user", "content": "Hi"}],
+ "stream": true
+ }]],
+ })
+
+ local results = {}
+ while true do
+ local chunk, err = res.body_reader()
+ if not chunk then break end
+ table.insert(results, chunk)
+ end
+
+ ngx.print(table.concat(results, ""))
+ }
+ }
+--- error_code: 200
+--- response_body eval
+qr/event: message_start\ndata:.*?"type":"message_start".*?event:
content_block_start\ndata:.*?event:
content_block_delta\ndata:.*?"text":"Hello".*?event:
content_block_delta\ndata:.*?"text":" world".*?event:
content_block_stop\ndata:.*?event: message_delta\ndata:.*?event:
message_stop\ndata:/s
+
+
+
+=== TEST 9: Set up route for system prompt conversion test
+--- config
+ location /t {
+ content_by_lua_block {
+ local t = require("lib.test_admin").test
+ local code, body = t('/apisix/admin/routes/1',
+ ngx.HTTP_PUT,
+ [[{
+ "uri": "/v1/messages",
+ "plugins": {
+ "ai-proxy": {
+ "provider": "openai",
+ "auth": {
+ "header": {
+ "Authorization": "Bearer token"
+ }
+ },
+ "override": {
+ "endpoint":
"http://localhost:6724/v1/chat/completions"
+ }
+ }
+ }
+ }]]
+ )
+
+ if code >= 300 then
+ ngx.status = code
+ end
+ ngx.say(body)
+ }
+ }
+--- response_body
+passed
+
+
+
+=== TEST 10: System prompt is converted to OpenAI messages[0] with role=system
+--- http_config
+ server {
+ server_name openai;
+ listen 6724;
+
+ default_type 'application/json';
+
+ location /v1/chat/completions {
+ content_by_lua_block {
+ local json = require("cjson.safe")
+ ngx.req.read_body()
+ local body, err = json.decode(ngx.req.get_body_data())
+
+ if not body or not body.messages then
+ ngx.status = 400
+ ngx.say([[{"error": "no messages"}]])
+ return
+ end
+
+ -- Verify that system prompt has been converted to first
message
+ local first_msg = body.messages[1]
+ if not first_msg or first_msg.role ~= "system" then
+ ngx.status = 400
+ ngx.say([[{"error": "system message not found or wrong
role"}]])
+ return
+ end
+
+ if first_msg.content ~= "You are a helpful assistant." then
+ ngx.status = 400
+ ngx.say([[{"error": "system message content mismatch:
]] .. (first_msg.content or "") .. [["}]])
+ return
+ end
+
+ local second_msg = body.messages[2]
+ if not second_msg or second_msg.role ~= "user" then
+ ngx.status = 400
+ ngx.say([[{"error": "user message not in correct
position"}]])
+ return
+ end
+
+ ngx.status = 200
+ ngx.say([[{
+ "id": "chatcmpl-sys",
+ "object": "chat.completion",
+ "model": "gpt-4o",
+ "choices": [{
+ "index": 0,
+ "message": {"role": "assistant", "content":
"system prompt ok"},
+ "finish_reason": "stop"
+ }],
+ "usage": {"prompt_tokens": 10, "completion_tokens": 5,
"total_tokens": 15}
+ }]])
+ }
+ }
+ }
+--- config
+ location /t {
+ content_by_lua_block {
+ local http = require("resty.http")
+ local httpc = http.new()
+
+ local ok, err = httpc:connect({
+ scheme = "http",
+ host = "localhost",
+ port = ngx.var.server_port,
+ })
+
+ local res, err = httpc:request({
+ method = "POST",
+ path = "/v1/messages",
+ headers = { ["Content-Type"] = "application/json",
["Connection"] = "close" },
+ body = [[{
+ "model": "claude-3-5-sonnet-20241022",
+ "system": "You are a helpful assistant.",
+ "messages": [{"role": "user", "content": "hello"}]
+ }]],
+ })
+
+ local body = res:read_body()
+ ngx.status = res.status
+ ngx.print(body)
+ }
+ }
+--- error_code: 200
+--- response_body eval
+qr/"text":"system prompt ok"/
+
+
+
+=== TEST 11: Set up route for tool calling conversion test
+--- config
+ location /t {
+ content_by_lua_block {
+ local t = require("lib.test_admin").test
+ local code, body = t('/apisix/admin/routes/1',
+ ngx.HTTP_PUT,
+ [[{
+ "uri": "/v1/messages",
+ "plugins": {
+ "ai-proxy": {
+ "provider": "openai",
+ "auth": {
+ "header": {
+ "Authorization": "Bearer token"
+ }
+ },
+ "override": {
+ "endpoint":
"http://localhost:6724/v1/chat/completions"
+ }
+ }
+ }
+ }]]
+ )
+
+ if code >= 300 then
+ ngx.status = code
+ end
+ ngx.say(body)
+ }
+ }
+--- response_body
+passed
+
+
+
+=== TEST 12: Tool calling request/response conversion (Anthropic <-> OpenAI)
+--- request
+POST /v1/messages
+{"model":"claude-3-5-sonnet-20241022","messages":[{"role":"user","content":"What
is the weather in Paris?"}],"tools":[{"name":"get_weather","description":"Get
weather","input_schema":{"type":"object","properties":{"location":{"type":"string"}},"required":["location"]}}]}
+--- more_headers
+Authorization: Bearer token
+test-type: tools
+--- error_code: 200
+--- response_body eval
+qr/(?=.*"stop_reason":"tool_use")(?=.*"type":"tool_use")(?=.*"name":"get_weather")/s
+
+
+
+=== TEST 13: Set up route for null finish_reason test
+--- config
+ location /t {
+ content_by_lua_block {
+ local t = require("lib.test_admin").test
+ local code, body = t('/apisix/admin/routes/1',
+ ngx.HTTP_PUT,
+ [[{
+ "uri": "/v1/messages",
+ "plugins": {
+ "ai-proxy": {
+ "provider": "openai",
+ "auth": {
+ "header": {
+ "Authorization": "Bearer token"
+ }
+ },
+ "options": {
+ "model": "gpt-4o",
+ "stream": true
+ },
+ "override": {
+ "endpoint":
"http://localhost:6724/v1/chat/completions"
+ }
+ }
+ }
+ }]]
+ )
+
+ if code >= 300 then
+ ngx.status = code
+ end
+ ngx.say(body)
+ }
+ }
+--- response_body
+passed
+
+
+
+=== TEST 14: message_stop emitted only once (finish_reason as JSON null must
not trigger end events)
+--- http_config
+ server {
+ server_name openai;
+ listen 6724;
+
+ default_type 'application/json';
+
+ location /v1/chat/completions {
+ content_by_lua_block {
+ ngx.header["Content-Type"] = "text/event-stream"
+
+ -- chunk 1: role only, finish_reason = null (JSON null)
+ ngx.say('data:
{"id":"chatcmpl-null","object":"chat.completion.chunk","model":"gpt-4o","choices":[{"index":0,"delta":{"role":"assistant"},"finish_reason":null}]}'
.. "\n")
+ ngx.flush(true)
+
+ -- chunk 2: content delta, finish_reason = null (JSON null)
+ ngx.say('data:
{"id":"chatcmpl-null","object":"chat.completion.chunk","model":"gpt-4o","choices":[{"index":0,"delta":{"content":"Hi"},"finish_reason":null}]}'
.. "\n")
+ ngx.flush(true)
+
+ -- chunk 3: final, finish_reason = "stop"
+ ngx.say('data:
{"id":"chatcmpl-null","object":"chat.completion.chunk","model":"gpt-4o","choices":[{"index":0,"delta":{"content":"!"},"finish_reason":"stop"}],"usage":{"prompt_tokens":5,"completion_tokens":3,"total_tokens":8}}'
.. "\n")
+ ngx.flush(true)
+
+ ngx.say("data: [DONE]\n")
+ }
+ }
+ }
+--- config
+ location /t {
+ content_by_lua_block {
+ local http = require("resty.http")
+ local httpc = http.new()
+
+ local ok, err = httpc:connect({
+ scheme = "http",
+ host = "localhost",
+ port = ngx.var.server_port,
+ })
+
+ local res, err = httpc:request({
+ method = "POST",
+ path = "/v1/messages",
+ headers = { ["Content-Type"] = "application/json",
["Connection"] = "close" },
+ body = [[{
+ "model": "gpt-4o",
+ "messages": [{"role": "user", "content": "Hi"}],
+ "stream": true
+ }]],
+ })
+
+ local results = {}
+ while true do
+ local chunk, err = res.body_reader()
+ if not chunk then break end
+ table.insert(results, chunk)
+ end
+
+ local body = table.concat(results, "")
+
+ -- count occurrences of message_stop
+ local count = 0
+ for _ in body:gmatch('"type":"message_stop"') do
+ count = count + 1
+ end
+
+ -- also verify the final content delta and message_delta were
emitted
+ local has_final_content = body:find('"text":"!"', 1, true) ~= nil
+ local has_message_delta = body:find('"type":"message_delta"', 1,
true) ~= nil
+
+ if count ~= 1 then
+ ngx.say("FAIL: message_stop appeared " .. count .. " times,
expected 1")
+ elseif not has_final_content then
+ ngx.say("FAIL: final content '!' not found in body")
+ elseif not has_message_delta then
+ ngx.say("FAIL: message_delta event not found in body")
+ else
+ ngx.say("OK: message_stop appeared exactly once")
+ end
+ }
+ }
+--- error_code: 200
+--- response_body
+OK: message_stop appeared exactly once
+
+
+
+=== TEST 15: Set up route for OpenRouter-style double finish_reason chunk test
+--- config
+ location /t {
+ content_by_lua_block {
+ local t = require("lib.test_admin").test
+ local code, body = t('/apisix/admin/routes/1',
+ ngx.HTTP_PUT,
+ [[{
+ "uri": "/v1/messages",
+ "plugins": {
+ "ai-proxy": {
+ "provider": "openai",
+ "auth": {
+ "header": {
+ "Authorization": "Bearer token"
+ }
+ },
+ "options": {
+ "model": "gpt-4o",
+ "stream": true
+ },
+ "override": {
+ "endpoint":
"http://localhost:6724/v1/chat/completions"
+ }
+ }
+ }
+ }]]
+ )
+
+ if code >= 300 then
+ ngx.status = code
+ end
+ ngx.say(body)
+ }
+ }
+--- response_body
+passed
+
+
+
+=== TEST 16: OpenRouter sends two finish_reason chunks — message_stop must
appear exactly once, no empty content_block_delta
+--- http_config
+ server {
+ server_name openai;
+ listen 6724;
+
+ default_type 'application/json';
+
+ location /v1/chat/completions {
+ content_by_lua_block {
+ ngx.header["Content-Type"] = "text/event-stream"
+
+ -- chunk 1: first content token
+ ngx.say('data:
{"id":"gen-1","object":"chat.completion.chunk","model":"openai/gpt-4o","choices":[{"index":0,"delta":{"content":"Hi","role":"assistant"},"finish_reason":null,"native_finish_reason":null}]}'
.. "\n")
+ ngx.flush(true)
+
+ -- chunk 2: second content token
+ ngx.say('data:
{"id":"gen-1","object":"chat.completion.chunk","model":"openai/gpt-4o","choices":[{"index":0,"delta":{"content":"!","role":"assistant"},"finish_reason":null,"native_finish_reason":null}]}'
.. "\n")
+ ngx.flush(true)
+
+ -- chunk 3: finish_reason=stop, empty content, NO usage
(OpenRouter first stop chunk)
+ ngx.say('data:
{"id":"gen-1","object":"chat.completion.chunk","model":"openai/gpt-4o","choices":[{"index":0,"delta":{"content":"","role":"assistant"},"finish_reason":"stop","native_finish_reason":"stop"}]}'
.. "\n")
+ ngx.flush(true)
+
+ -- chunk 4: finish_reason=stop, empty content, WITH usage
(OpenRouter second stop chunk)
+ ngx.say('data:
{"id":"gen-1","object":"chat.completion.chunk","model":"openai/gpt-4o","choices":[{"index":0,"delta":{"content":"","role":"assistant"},"finish_reason":"stop","native_finish_reason":"stop"}],"usage":{"prompt_tokens":8,"completion_tokens":2,"total_tokens":10}}'
.. "\n")
+ ngx.flush(true)
+
+ ngx.say("data: [DONE]\n")
+ }
+ }
+ }
+--- config
+ location /t {
+ content_by_lua_block {
+ local http = require("resty.http")
+ local httpc = http.new()
+
+ local ok, err = httpc:connect({
+ scheme = "http",
+ host = "localhost",
+ port = ngx.var.server_port,
+ })
+
+ local res, err = httpc:request({
+ method = "POST",
+ path = "/v1/messages",
+ headers = { ["Content-Type"] = "application/json",
["Connection"] = "close" },
+ body = [[{
+ "model": "gpt-4o",
+ "messages": [{"role": "user", "content": "Hi"}],
+ "stream": true
+ }]],
+ })
+
+ local results = {}
+ while true do
+ local chunk, err = res.body_reader()
+ if not chunk then break end
+ table.insert(results, chunk)
+ end
+
+ local body = table.concat(results, "")
+
+ -- Count message_stop occurrences (must be exactly 1)
+ local stop_count = 0
+ for _ in body:gmatch('"type":"message_stop"') do
+ stop_count = stop_count + 1
+ end
+
+ -- Verify no empty text_delta events are emitted (finish_reason
chunks with
+ -- empty content must not produce content_block_delta events).
+ -- content_block_start legitimately contains text:"", so we check
the
+ -- per-event data for content_block_delta specifically.
+ local empty_delta_count = 0
+ for event_data in body:gmatch('event: content_block_delta\ndata:
([^\n]+)') do
+ local decoded = require("cjson.safe").decode(event_data)
+ if decoded and decoded.delta and decoded.delta.text == "" then
+ empty_delta_count = empty_delta_count + 1
+ end
+ end
+
+ -- Verify content tokens are present
+ local has_hi = body:find('"text":"Hi"', 1, true) ~= nil
+ local has_bang = body:find('"text":"!"', 1, true) ~= nil
+
+ if stop_count ~= 1 then
+ ngx.say("FAIL: message_stop appeared " .. stop_count .. "
times, expected 1")
+ elseif empty_delta_count > 0 then
+ ngx.say("FAIL: found " .. empty_delta_count .. " empty
text_delta(s), expected 0")
+ elseif not has_hi then
+ ngx.say("FAIL: content 'Hi' not found")
+ elseif not has_bang then
+ ngx.say("FAIL: content '!' not found")
+ else
+ ngx.say("OK: two finish_reason chunks handled correctly")
+ end
+ }
+ }
+--- error_code: 200
+--- response_body
+OK: two finish_reason chunks handled correctly
+
+
+
+=== TEST 17: Set up route for DeepSeek-style usage:null crash test
+--- config
+ location /t {
+ content_by_lua_block {
+ local t = require("lib.test_admin").test
+ local code, body = t('/apisix/admin/routes/1',
+ ngx.HTTP_PUT,
+ [[{
+ "uri": "/v1/messages",
+ "plugins": {
+ "ai-proxy": {
+ "provider": "openai",
+ "auth": {
+ "header": {
+ "Authorization": "Bearer token"
+ }
+ },
+ "options": {
+ "model": "deepseek-chat",
+ "stream": true
+ },
+ "override": {
+ "endpoint":
"http://localhost:6724/v1/chat/completions"
+ }
+ }
+ }
+ }]]
+ )
+
+ if code >= 300 then
+ ngx.status = code
+ end
+ ngx.say(body)
+ }
+ }
+--- response_body
+passed
+
+
+
+=== TEST 18: DeepSeek sends usage:null on non-final chunks — must not crash,
content must be preserved
+--- http_config
+ server {
+ server_name openai;
+ listen 6724;
+
+ default_type 'application/json';
+
+ location /v1/chat/completions {
+ content_by_lua_block {
+ ngx.header["Content-Type"] = "text/event-stream"
+
+ -- chunk 1: role only, usage:null (DeepSeek pattern)
+ ngx.say('data:
{"id":"ds-1","object":"chat.completion.chunk","model":"deepseek-chat","choices":[{"index":0,"delta":{"role":"assistant","content":""},"finish_reason":null}],"usage":null}'
.. "\n")
+ ngx.flush(true)
+
+ -- chunk 2: first content, usage:null
+ ngx.say('data:
{"id":"ds-1","object":"chat.completion.chunk","model":"deepseek-chat","choices":[{"index":0,"delta":{"content":"Hi"},"finish_reason":null}],"usage":null}'
.. "\n")
+ ngx.flush(true)
+
+ -- chunk 3: second content, usage:null
+ ngx.say('data:
{"id":"ds-1","object":"chat.completion.chunk","model":"deepseek-chat","choices":[{"index":0,"delta":{"content":"!"},"finish_reason":null}],"usage":null}'
.. "\n")
+ ngx.flush(true)
+
+ -- chunk 4: final, finish_reason=stop, usage populated
+ ngx.say('data:
{"id":"ds-1","object":"chat.completion.chunk","model":"deepseek-chat","choices":[{"index":0,"delta":{"content":""},"finish_reason":"stop"}],"usage":{"prompt_tokens":5,"completion_tokens":2,"total_tokens":7}}'
.. "\n")
+ ngx.flush(true)
+
+ ngx.say("data: [DONE]\n")
+ }
+ }
+ }
+--- config
+ location /t {
+ content_by_lua_block {
+ local http = require("resty.http")
+ local httpc = http.new()
+
+ local ok, err = httpc:connect({
+ scheme = "http",
+ host = "localhost",
+ port = ngx.var.server_port,
+ })
+
+ local res, err = httpc:request({
+ method = "POST",
+ path = "/v1/messages",
+ headers = { ["Content-Type"] = "application/json",
["Connection"] = "close" },
+ body = [[{
+ "model": "deepseek-chat",
+ "messages": [{"role": "user", "content": "Hi"}],
+ "stream": true
+ }]],
+ })
+
+ local results = {}
+ while true do
+ local chunk, err = res.body_reader()
+ if not chunk then break end
+ table.insert(results, chunk)
+ end
+
+ local body = table.concat(results, "")
+
+ local stop_count = 0
+ for _ in body:gmatch('"type":"message_stop"') do
+ stop_count = stop_count + 1
+ end
+
+ local has_hi = body:find('"text":"Hi"', 1, true) ~= nil
+ local has_bang = body:find('"text":"!"', 1, true) ~= nil
+ local has_message_start = body:find('event: message_start', 1,
true) ~= nil
+
+ if not has_message_start then
+ ngx.say("FAIL: message_start not found (possible 500 crash)")
+ elseif stop_count ~= 1 then
+ ngx.say("FAIL: message_stop appeared " .. stop_count .. "
times, expected 1")
+ elseif not has_hi then
+ ngx.say("FAIL: content 'Hi' not found")
+ elseif not has_bang then
+ ngx.say("FAIL: content '!' not found")
+ else
+ ngx.say("OK: DeepSeek usage:null chunks handled correctly")
+ end
+ }
+ }
+--- error_code: 200
+--- response_body
+OK: DeepSeek usage:null chunks handled correctly
+
+
+
+=== TEST 19: Set up route for first-chunk role+content test (OpenRouter
pattern)
+--- config
+ location /t {
+ content_by_lua_block {
+ local t = require("lib.test_admin").test
+ local code, body = t('/apisix/admin/routes/1',
+ ngx.HTTP_PUT,
+ [[{
+ "uri": "/v1/messages",
+ "plugins": {
+ "ai-proxy": {
+ "provider": "openai",
+ "auth": {
+ "header": {
+ "Authorization": "Bearer token"
+ }
+ },
+ "options": {
+ "model": "gpt-4o",
+ "stream": true
+ },
+ "override": {
+ "endpoint":
"http://localhost:6724/v1/chat/completions"
+ }
+ }
+ }
+ }]]
+ )
+
+ if code >= 300 then
+ ngx.status = code
+ end
+ ngx.say(body)
+ }
+ }
+--- response_body
+passed
+
+
+
+=== TEST 20: First chunk contains both role and content simultaneously —
content must not be lost
+--- http_config
+ server {
+ server_name openai;
+ listen 6724;
+
+ default_type 'application/json';
+
+ location /v1/chat/completions {
+ content_by_lua_block {
+ ngx.header["Content-Type"] = "text/event-stream"
+
+ -- chunk 1: role AND content in the same delta (OpenRouter
pattern)
+ ngx.say('data:
{"id":"gen-1","object":"chat.completion.chunk","model":"openai/gpt-4o","choices":[{"index":0,"delta":{"content":"hello","role":"assistant"},"finish_reason":null}]}'
.. "\n")
+ ngx.flush(true)
+
+ -- chunk 2: more content
+ ngx.say('data:
{"id":"gen-1","object":"chat.completion.chunk","model":"openai/gpt-4o","choices":[{"index":0,"delta":{"content":"
world"},"finish_reason":null}]}' .. "\n")
+ ngx.flush(true)
+
+ -- chunk 3: final, finish_reason=stop, empty content, with
usage
+ ngx.say('data:
{"id":"gen-1","object":"chat.completion.chunk","model":"openai/gpt-4o","choices":[{"index":0,"delta":{"content":""},"finish_reason":"stop"}],"usage":{"prompt_tokens":8,"completion_tokens":2,"total_tokens":10}}'
.. "\n")
+ ngx.flush(true)
+
+ ngx.say("data: [DONE]\n")
+ }
+ }
+ }
+--- config
+ location /t {
+ content_by_lua_block {
+ local http = require("resty.http")
+ local httpc = http.new()
+
+ local ok, err = httpc:connect({
+ scheme = "http",
+ host = "localhost",
+ port = ngx.var.server_port,
+ })
+
+ local res, err = httpc:request({
+ method = "POST",
+ path = "/v1/messages",
+ headers = { ["Content-Type"] = "application/json",
["Connection"] = "close" },
+ body = [[{
+ "model": "gpt-4o",
+ "messages": [{"role": "user", "content": "Hi"}],
+ "stream": true
+ }]],
+ })
+
+ local results = {}
+ while true do
+ local chunk, err = res.body_reader()
+ if not chunk then break end
+ table.insert(results, chunk)
+ end
+
+ local body = table.concat(results, "")
+
+ local has_hello = body:find('"hello"', 1, true) ~= nil
+ local has_world = body:find('" world"', 1, true) ~= nil
+ local stop_count = 0
+ for _ in body:gmatch('"type":"message_stop"') do
+ stop_count = stop_count + 1
+ end
+
+ if not has_hello then
+ ngx.say("FAIL: content 'hello' lost from first chunk")
+ elseif not has_world then
+ ngx.say("FAIL: content ' world' lost from second chunk")
+ elseif stop_count ~= 1 then
+ ngx.say("FAIL: message_stop appeared " .. stop_count .. "
times, expected 1")
+ else
+ ngx.say("OK: first-chunk role+content preserved correctly")
+ end
+ }
+ }
+--- error_code: 200
+--- response_body
+OK: first-chunk role+content preserved correctly
+
+
+
+=== TEST 21: sse.encode output must end with \n\n (SSE spec requires
blank-line event terminator)
+--- config
+ location /t {
+ content_by_lua_block {
+ local sse = require("apisix.plugins.ai-transport.sse")
+
+ -- Test a named event (e.g. message_stop)
+ local out = sse.encode({ type = "message_stop", data =
'{"type":"message_stop"}' })
+ if out:sub(-2) ~= "\n\n" then
+ ngx.say("FAIL: named event does not end with \\n\\n, got: " ..
+ string.format("%q", out:sub(-4)))
+ return
+ end
+
+ -- Test a plain data event (type == "message", no event: line)
+ local out2 = sse.encode({ type = "message", data = '{"foo":"bar"}'
})
+ if out2:sub(-2) ~= "\n\n" then
+ ngx.say("FAIL: data-only event does not end with \\n\\n, got:
" ..
+ string.format("%q", out2:sub(-4)))
+ return
+ end
+
+ ngx.say("OK: sse.encode output ends with \\n\\n")
+ }
+ }
+--- response_body
+OK: sse.encode output ends with \n\n
+
+
+
+=== TEST 22: empty SSE data frames between real chunks must not trigger JSON
decode warnings
+--- http_config
+ server {
+ server_name openai;
+ listen 6724;
+
+ default_type 'application/json';
+
+ location /v1/chat/completions {
+ content_by_lua_block {
+ ngx.header["Content-Type"] = "text/event-stream"
+
+ -- chunk 1: real content
+ ngx.say('data:
{"id":"chatcmpl-empty","object":"chat.completion.chunk","model":"gpt-4o","choices":[{"index":0,"delta":{"role":"assistant","content":"Hi"},"finish_reason":null}]}'
.. "\n")
+ ngx.flush(true)
+
+ -- empty data frame (blank line between events)
+ ngx.say("data: \n")
+ ngx.flush(true)
+
+ -- chunk 2: final with finish_reason=stop
+ ngx.say('data:
{"id":"chatcmpl-empty","object":"chat.completion.chunk","model":"gpt-4o","choices":[{"index":0,"delta":{"content":""},"finish_reason":"stop"}],"usage":{"prompt_tokens":3,"completion_tokens":1,"total_tokens":4}}'
.. "\n")
+ ngx.flush(true)
+
+ ngx.say("data: [DONE]\n")
+ }
+ }
+ }
+--- config
+ location /t {
+ content_by_lua_block {
+ local http = require("resty.http")
+ local httpc = http.new()
+
+ local ok, err = httpc:connect({
+ scheme = "http",
+ host = "localhost",
+ port = ngx.var.server_port,
+ })
+
+ local res, err = httpc:request({
+ method = "POST",
+ path = "/v1/messages",
+ headers = { ["Content-Type"] = "application/json",
["Connection"] = "close" },
+ body = [[{
+ "model": "gpt-4o",
+ "messages": [{"role": "user", "content": "Hi"}],
+ "stream": true
+ }]],
+ })
+
+ local results = {}
+ while true do
+ local chunk, err = res.body_reader()
+ if not chunk then break end
+ table.insert(results, chunk)
+ end
+
+ local body = table.concat(results, "")
+ local has_content = body:find('"text":"Hi"', 1, true) ~= nil
+ local has_stop = body:find('"type":"message_stop"', 1, true) ~= nil
+
+ if not has_content then
+ ngx.say("FAIL: content 'Hi' not found")
+ elseif not has_stop then
+ ngx.say("FAIL: message_stop not found")
+ else
+ ngx.say("OK: empty data frame handled without error")
+ end
+ }
+ }
+--- error_code: 200
+--- response_body
+OK: empty data frame handled without error
+--- no_error_log
+failed to decode SSE data
+
+
+
+=== TEST 23: sse.encode handles edge cases correctly
+--- config
+ location /t {
+ content_by_lua_block {
+ local sse = require("apisix.plugins.ai-transport.sse")
+
+ -- empty string data: should still produce a valid SSE frame
ending with \n\n
+ local out1 = sse.encode({ type = "content_block_delta", data = ""
})
+ if out1:sub(-2) ~= "\n\n" then
+ ngx.say("FAIL: empty data does not end with \\n\\n")
+ return
+ end
+
+ -- large payload: must not be truncated, must end with \n\n
+ local large_data = string.rep("x", 8192)
+ local out2 = sse.encode({ type = "content_block_delta", data =
large_data })
+ if out2:sub(-2) ~= "\n\n" then
+ ngx.say("FAIL: large payload does not end with \\n\\n")
+ return
+ end
+ if not out2:find(large_data, 1, true) then
+ ngx.say("FAIL: large payload was truncated")
+ return
+ end
+
+ -- special characters in data: quotes, backslashes, newlines must
be preserved
+ local special_data =
'{"text":"line1\\nline2","quote":"\\"hello\\""}'
+ local out3 = sse.encode({ type = "content_block_delta", data =
special_data })
+ if out3:sub(-2) ~= "\n\n" then
+ ngx.say("FAIL: special-char data does not end with \\n\\n")
+ return
+ end
+ if not out3:find(special_data, 1, true) then
+ ngx.say("FAIL: special characters were mangled")
+ return
+ end
+
+ ngx.say("OK: sse.encode edge cases passed")
+ }
+ }
+--- response_body
+OK: sse.encode edge cases passed
+
+
+
+=== TEST 24: Set up route for usage-only final chunk test
+--- config
+ location /t {
+ content_by_lua_block {
+ local t = require("lib.test_admin").test
+ local code, body = t('/apisix/admin/routes/1',
+ ngx.HTTP_PUT,
+ [[{
+ "uri": "/v1/messages",
+ "plugins": {
+ "ai-proxy": {
+ "provider": "openai",
+ "auth": {
+ "header": {
+ "Authorization": "Bearer token"
+ }
+ },
+ "options": {
+ "model": "gpt-4o",
+ "stream": true
+ },
+ "override": {
+ "endpoint":
"http://localhost:6724/v1/chat/completions"
+ }
+ }
+ }
+ }]]
+ )
+
+ if code >= 300 then
+ ngx.status = code
+ end
+ ngx.say(body)
+ }
+ }
+--- response_body
+passed
+
+
+
+=== TEST 25: usage in a separate chunk after message_stop — message_delta with
usage must be emitted
+--- http_config
+ server {
+ server_name openai;
+ listen 6724;
+
+ default_type 'application/json';
+
+ location /v1/chat/completions {
+ content_by_lua_block {
+ ngx.header["Content-Type"] = "text/event-stream"
+
+ -- chunk 1: content
+ ngx.say('data:
{"id":"cmpl-1","object":"chat.completion.chunk","model":"gpt-4o","choices":[{"index":0,"delta":{"role":"assistant","content":"Hi"},"finish_reason":null}],"usage":null}'
.. "\n")
+ ngx.flush(true)
+
+ -- chunk 2: finish_reason=stop, usage=null (usage not yet
available)
+ ngx.say('data:
{"id":"cmpl-1","object":"chat.completion.chunk","model":"gpt-4o","choices":[{"index":0,"delta":{},"finish_reason":"stop"}],"usage":null}'
.. "\n")
+ ngx.flush(true)
+
+ -- chunk 3: usage-only chunk with no choices (sent after
message_stop)
+ ngx.say('data:
{"id":"cmpl-1","object":"chat.completion.chunk","model":"gpt-4o","choices":[],"usage":{"prompt_tokens":10,"completion_tokens":5,"total_tokens":15}}'
.. "\n")
+ ngx.flush(true)
+
+ ngx.say("data: [DONE]\n")
+ }
+ }
+ }
+--- config
+ location /t {
+ content_by_lua_block {
+ local http = require("resty.http")
+ local httpc = http.new()
+
+ local ok, err = httpc:connect({
+ scheme = "http",
+ host = "localhost",
+ port = ngx.var.server_port,
+ })
+
+ local res, err = httpc:request({
+ method = "POST",
+ path = "/v1/messages",
+ headers = { ["Content-Type"] = "application/json",
["Connection"] = "close" },
+ body = [[{
+ "model": "gpt-4o",
+ "messages": [{"role": "user", "content": "Hi"}],
+ "stream": true
+ }]],
+ })
+
+ local results = {}
+ while true do
+ local chunk, err = res.body_reader()
+ if not chunk then break end
+ table.insert(results, chunk)
+ end
+
+ local body = table.concat(results, "")
+
+ -- message_stop must appear exactly once
+ local stop_count = 0
+ for _ in body:gmatch('"type":"message_stop"') do
+ stop_count = stop_count + 1
+ end
+
+ -- At least one message_delta must carry usage (input_tokens +
output_tokens)
+ local has_usage = body:find('"input_tokens":10', 1, true) ~= nil
+ and body:find('"output_tokens":5', 1, true) ~= nil
+
+ if stop_count ~= 1 then
+ ngx.say("FAIL: message_stop appeared " .. stop_count .. "
times, expected 1")
+ elseif not has_usage then
+ ngx.say("FAIL: usage (input_tokens=10, output_tokens=5) not
found in stream")
+ else
+ ngx.say("OK: usage-only chunk produced message_delta with
usage")
+ end
+ }
+ }
+--- error_code: 200
+--- response_body
+OK: usage-only chunk produced message_delta with usage
+--- no_error_log
+[error]
+
+
+
+=== TEST 26: Anthropic SSE error event should be logged at warn level
+--- config
+ location /t {
+ content_by_lua_block {
+ local proto =
require("apisix.plugins.ai-protocols.anthropic-messages")
+ local event = {
+ type = "error",
+ data =
'{"type":"error","error":{"type":"overloaded_error","message":"Overloaded"}}'
+ }
+ local result = proto.parse_sse_event(event, {var = {}}, {})
+ ngx.say("type: " .. result.type)
+ }
+ }
+--- response_body
+type: done
+--- error_log
+Anthropic SSE error: type=overloaded_error, message=Overloaded