This is an automated email from the ASF dual-hosted git repository.
nic443 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix.git
The following commit(s) were added to refs/heads/master by this push:
new da45cbe48 fix(ai-proxy): return 502 when streaming converter receives
mismatched response format (#13229)
da45cbe48 is described below
commit da45cbe482b6f621ebd805c031f4b6bf1fe89b65
Author: Nic <[email protected]>
AuthorDate: Thu Apr 16 15:51:57 2026 +0800
fix(ai-proxy): return 502 when streaming converter receives mismatched
response format (#13229)
---
apisix/plugins/ai-providers/base.lua | 30 ++++---
t/plugin/ai-proxy-protocol-conversion.t | 147 ++++++++++++++++++++++++++++++++
2 files changed, 163 insertions(+), 14 deletions(-)
diff --git a/apisix/plugins/ai-providers/base.lua
b/apisix/plugins/ai-providers/base.lua
index 4a69f9eaf..b037924ee 100644
--- a/apisix/plugins/ai-providers/base.lua
+++ b/apisix/plugins/ai-providers/base.lua
@@ -266,6 +266,11 @@ function _M.parse_streaming_response(self, ctx, res,
target_proto, converter)
local contents = {}
local sse_state = { is_first = true }
local sse_buf = ""
+ -- Track whether any output was sent to the client.
+ -- When a converter is active but the upstream returns a different SSE
format,
+ -- all events may be skipped and no output produced, leaving the response
+ -- uncommitted and causing nginx to fall through to the balancer phase.
+ local output_sent = false
while true do
local chunk, err = body_reader()
@@ -276,21 +281,17 @@ function _M.parse_streaming_response(self, ctx, res,
target_proto, converter)
return transport_http.handle_error(err)
end
if not chunk then
- if #sse_buf == 0 then
- return
+ if #sse_buf > 0 then
+ core.log.warn("dropping incomplete SSE frame at EOF, size: ",
+ #sse_buf)
end
- -- EOF with buffered remainder — process it and exit
- local events = sse.decode(sse_buf)
- for _, event in ipairs(events) do
- local parsed = target_proto.parse_sse_event(event, ctx,
sse_state)
- if parsed and parsed.type ~= "skip" then
- if parsed.usage then
- merge_usage(ctx, parsed)
- ctx.var.llm_prompt_tokens =
ctx.ai_token_usage.prompt_tokens
- ctx.var.llm_completion_tokens =
ctx.ai_token_usage.completion_tokens
- ctx.var.llm_response_text = table.concat(contents, "")
- end
- end
+
+ if converter and not output_sent then
+ local msg = "streaming response completed without producing "
+ .. "any output; the upstream likely returned a "
+ .. "different SSE format than the converter
expects"
+ core.log.error(msg)
+ return 502, msg
end
return
end
@@ -355,6 +356,7 @@ function _M.parse_streaming_response(self, ctx, res,
target_proto, converter)
if converter then
for _, c in ipairs(converted_chunks) do
plugin.lua_response_filter(ctx, res.headers, c)
+ output_sent = true
end
else
plugin.lua_response_filter(ctx, res.headers, chunk)
diff --git a/t/plugin/ai-proxy-protocol-conversion.t
b/t/plugin/ai-proxy-protocol-conversion.t
index a6ee0f2e2..65f43ee92 100644
--- a/t/plugin/ai-proxy-protocol-conversion.t
+++ b/t/plugin/ai-proxy-protocol-conversion.t
@@ -1382,3 +1382,150 @@ OK: usage-only chunk produced message_delta with usage
type: done
--- error_log
Anthropic SSE error: type=overloaded_error, message=Overloaded
+
+
+
+=== TEST 27: Set up route for response format mismatch test –
openai-compatible provider with Anthropic override endpoint
+--- config
+ location /t {
+ content_by_lua_block {
+ local t = require("lib.test_admin").test
+ local code, body = t('/apisix/admin/routes/1',
+ ngx.HTTP_PUT,
+ [[{
+ "uri": "/v1/messages",
+ "plugins": {
+ "ai-proxy": {
+ "provider": "openai-compatible",
+ "auth": {
+ "header": {
+ "Authorization": "Bearer token"
+ }
+ },
+ "options": {
+ "model": "test-model"
+ },
+ "override": {
+ "endpoint": "http://localhost:6730/v1/messages"
+ }
+ }
+ }
+ }]]
+ )
+
+ if code >= 300 then
+ ngx.status = code
+ end
+ ngx.say(body)
+ }
+ }
+--- response_body
+passed
+
+
+
+=== TEST 28: Streaming 502 when converter receives mismatched upstream
response format
+When the client sends Anthropic format (detected via /v1/messages URI) but the
provider
+is openai-compatible (only supports openai-chat), a converter bridges the gap.
If the
+upstream endpoint also returns Anthropic-format SSE (instead of OpenAI), the
converter
+cannot parse any events and the gateway should return 502 instead of crashing.
+--- http_config
+ server {
+ server_name anthropic_upstream;
+ listen 6730;
+
+ default_type 'application/json';
+
+ location /v1/messages {
+ content_by_lua_block {
+ local json = require("toolkit.json")
+ ngx.header["Content-Type"] = "text/event-stream"
+
+ -- Return Anthropic-format SSE events (not OpenAI format)
+ ngx.say("event: message_start")
+ ngx.say("data: " .. json.encode({
+ type = "message_start",
+ message = {
+ id = "msg_123",
+ type = "message",
+ role = "assistant",
+ model = "test-model",
+ content = {},
+ usage = { input_tokens = 10, output_tokens = 0 },
+ }
+ }))
+ ngx.say("")
+ ngx.flush(true)
+
+ ngx.say("event: content_block_start")
+ ngx.say("data: " .. json.encode({
+ type = "content_block_start",
+ index = 0,
+ content_block = { type = "text", text = "" },
+ }))
+ ngx.say("")
+ ngx.flush(true)
+
+ ngx.say("event: content_block_delta")
+ ngx.say("data: " .. json.encode({
+ type = "content_block_delta",
+ index = 0,
+ delta = { type = "text_delta", text = "Hello" },
+ }))
+ ngx.say("")
+ ngx.flush(true)
+
+ ngx.say("event: content_block_stop")
+ ngx.say("data: " .. json.encode({
+ type = "content_block_stop",
+ index = 0,
+ }))
+ ngx.say("")
+ ngx.flush(true)
+
+ ngx.say("event: message_delta")
+ ngx.say("data: " .. json.encode({
+ type = "message_delta",
+ delta = { stop_reason = "end_turn" },
+ usage = { output_tokens = 5 },
+ }))
+ ngx.say("")
+ ngx.flush(true)
+
+ ngx.say("event: message_stop")
+ ngx.say("data: {}")
+ ngx.say("")
+ }
+ }
+ }
+--- config
+ location /t {
+ content_by_lua_block {
+ local http = require("resty.http")
+ local httpc = http.new()
+
+ local ok, err = httpc:connect({
+ scheme = "http",
+ host = "localhost",
+ port = ngx.var.server_port,
+ })
+
+ local res, err = httpc:request({
+ method = "POST",
+ path = "/v1/messages",
+ headers = { ["Content-Type"] = "application/json",
["Connection"] = "close" },
+ body = [[{
+ "model": "test-model",
+ "messages": [{"role": "user", "content": "Hi"}],
+ "stream": true
+ }]],
+ })
+
+ res:read_body()
+ ngx.say("status: " .. res.status)
+ }
+ }
+--- response_body
+status: 502
+--- error_log
+streaming response completed without producing any output