This is an automated email from the ASF dual-hosted git repository.

nic443 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix.git


The following commit(s) were added to refs/heads/master by this push:
     new da45cbe48 fix(ai-proxy): return 502 when streaming converter receives 
mismatched response format (#13229)
da45cbe48 is described below

commit da45cbe482b6f621ebd805c031f4b6bf1fe89b65
Author: Nic <[email protected]>
AuthorDate: Thu Apr 16 15:51:57 2026 +0800

    fix(ai-proxy): return 502 when streaming converter receives mismatched 
response format (#13229)
---
 apisix/plugins/ai-providers/base.lua    |  30 ++++---
 t/plugin/ai-proxy-protocol-conversion.t | 147 ++++++++++++++++++++++++++++++++
 2 files changed, 163 insertions(+), 14 deletions(-)

diff --git a/apisix/plugins/ai-providers/base.lua 
b/apisix/plugins/ai-providers/base.lua
index 4a69f9eaf..b037924ee 100644
--- a/apisix/plugins/ai-providers/base.lua
+++ b/apisix/plugins/ai-providers/base.lua
@@ -266,6 +266,11 @@ function _M.parse_streaming_response(self, ctx, res, 
target_proto, converter)
     local contents = {}
     local sse_state = { is_first = true }
     local sse_buf = ""
+    -- Track whether any output was sent to the client.
+    -- When a converter is active but the upstream returns a different SSE 
format,
+    -- all events may be skipped and no output produced, leaving the response
+    -- uncommitted and causing nginx to fall through to the balancer phase.
+    local output_sent = false
 
     while true do
         local chunk, err = body_reader()
@@ -276,21 +281,17 @@ function _M.parse_streaming_response(self, ctx, res, 
target_proto, converter)
             return transport_http.handle_error(err)
         end
         if not chunk then
-            if #sse_buf == 0 then
-                return
+            if #sse_buf > 0 then
+                core.log.warn("dropping incomplete SSE frame at EOF, size: ",
+                              #sse_buf)
             end
-            -- EOF with buffered remainder — process it and exit
-            local events = sse.decode(sse_buf)
-            for _, event in ipairs(events) do
-                local parsed = target_proto.parse_sse_event(event, ctx, 
sse_state)
-                if parsed and parsed.type ~= "skip" then
-                    if parsed.usage then
-                        merge_usage(ctx, parsed)
-                        ctx.var.llm_prompt_tokens = 
ctx.ai_token_usage.prompt_tokens
-                        ctx.var.llm_completion_tokens = 
ctx.ai_token_usage.completion_tokens
-                        ctx.var.llm_response_text = table.concat(contents, "")
-                    end
-                end
+
+            if converter and not output_sent then
+                local msg = "streaming response completed without producing "
+                            .. "any output; the upstream likely returned a "
+                            .. "different SSE format than the converter 
expects"
+                core.log.error(msg)
+                return 502, msg
             end
             return
         end
@@ -355,6 +356,7 @@ function _M.parse_streaming_response(self, ctx, res, 
target_proto, converter)
         if converter then
             for _, c in ipairs(converted_chunks) do
                 plugin.lua_response_filter(ctx, res.headers, c)
+                output_sent = true
             end
         else
             plugin.lua_response_filter(ctx, res.headers, chunk)
diff --git a/t/plugin/ai-proxy-protocol-conversion.t 
b/t/plugin/ai-proxy-protocol-conversion.t
index a6ee0f2e2..65f43ee92 100644
--- a/t/plugin/ai-proxy-protocol-conversion.t
+++ b/t/plugin/ai-proxy-protocol-conversion.t
@@ -1382,3 +1382,150 @@ OK: usage-only chunk produced message_delta with usage
 type: done
 --- error_log
 Anthropic SSE error: type=overloaded_error, message=Overloaded
+
+
+
+=== TEST 27: Set up route for response format mismatch test – 
openai-compatible provider with Anthropic override endpoint
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/routes/1',
+                 ngx.HTTP_PUT,
+                 [[{
+                    "uri": "/v1/messages",
+                    "plugins": {
+                        "ai-proxy": {
+                            "provider": "openai-compatible",
+                            "auth": {
+                                "header": {
+                                    "Authorization": "Bearer token"
+                                }
+                            },
+                            "options": {
+                                "model": "test-model"
+                            },
+                            "override": {
+                                "endpoint": "http://localhost:6730/v1/messages";
+                            }
+                        }
+                    }
+                }]]
+            )
+
+            if code >= 300 then
+                ngx.status = code
+            end
+            ngx.say(body)
+        }
+    }
+--- response_body
+passed
+
+
+
+=== TEST 28: Streaming 502 when converter receives mismatched upstream 
response format
+When the client sends Anthropic format (detected via /v1/messages URI) but the 
provider
+is openai-compatible (only supports openai-chat), a converter bridges the gap. 
If the
+upstream endpoint also returns Anthropic-format SSE (instead of OpenAI), the 
converter
+cannot parse any events and the gateway should return 502 instead of crashing.
+--- http_config
+        server {
+            server_name anthropic_upstream;
+            listen 6730;
+
+            default_type 'application/json';
+
+            location /v1/messages {
+                content_by_lua_block {
+                    local json = require("toolkit.json")
+                    ngx.header["Content-Type"] = "text/event-stream"
+
+                    -- Return Anthropic-format SSE events (not OpenAI format)
+                    ngx.say("event: message_start")
+                    ngx.say("data: " .. json.encode({
+                        type = "message_start",
+                        message = {
+                            id = "msg_123",
+                            type = "message",
+                            role = "assistant",
+                            model = "test-model",
+                            content = {},
+                            usage = { input_tokens = 10, output_tokens = 0 },
+                        }
+                    }))
+                    ngx.say("")
+                    ngx.flush(true)
+
+                    ngx.say("event: content_block_start")
+                    ngx.say("data: " .. json.encode({
+                        type = "content_block_start",
+                        index = 0,
+                        content_block = { type = "text", text = "" },
+                    }))
+                    ngx.say("")
+                    ngx.flush(true)
+
+                    ngx.say("event: content_block_delta")
+                    ngx.say("data: " .. json.encode({
+                        type = "content_block_delta",
+                        index = 0,
+                        delta = { type = "text_delta", text = "Hello" },
+                    }))
+                    ngx.say("")
+                    ngx.flush(true)
+
+                    ngx.say("event: content_block_stop")
+                    ngx.say("data: " .. json.encode({
+                        type = "content_block_stop",
+                        index = 0,
+                    }))
+                    ngx.say("")
+                    ngx.flush(true)
+
+                    ngx.say("event: message_delta")
+                    ngx.say("data: " .. json.encode({
+                        type = "message_delta",
+                        delta = { stop_reason = "end_turn" },
+                        usage = { output_tokens = 5 },
+                    }))
+                    ngx.say("")
+                    ngx.flush(true)
+
+                    ngx.say("event: message_stop")
+                    ngx.say("data: {}")
+                    ngx.say("")
+                }
+            }
+        }
+--- config
+    location /t {
+        content_by_lua_block {
+            local http = require("resty.http")
+            local httpc = http.new()
+
+            local ok, err = httpc:connect({
+                scheme = "http",
+                host = "localhost",
+                port = ngx.var.server_port,
+            })
+
+            local res, err = httpc:request({
+                method = "POST",
+                path = "/v1/messages",
+                headers = { ["Content-Type"] = "application/json", 
["Connection"] = "close" },
+                body = [[{
+                    "model": "test-model",
+                    "messages": [{"role": "user", "content": "Hi"}],
+                    "stream": true
+                }]],
+            })
+
+            res:read_body()
+            ngx.say("status: " .. res.status)
+        }
+    }
+--- response_body
+status: 502
+--- error_log
+streaming response completed without producing any output

Reply via email to