This is an automated email from the ASF dual-hosted git repository. nic-6443 pushed a commit to branch fix/ai-proxy-client-disconnect in repository https://gitbox.apache.org/repos/asf/apisix.git
commit 092f7dcef7be71a470cda8d28d95568ed95a5087 Author: Nic <[email protected]> AuthorDate: Sat Apr 18 14:51:43 2026 +0800 feat(ai-proxy): abort upstream read on client disconnect during streaming When a downstream client disconnects mid-stream, the proxy was continuing to read all remaining chunks from the LLM, performing SSE parsing, token counting, and protocol conversion unnecessarily. Fix by passing wait=true to lua_response_filter in the streaming path. ngx.flush(true) returns an error when the client connection is gone, at which point we close the upstream httpc connection and return early. Changes: - plugin.lua: add optional wait param to lua_response_filter; return (ok, err) so callers can detect client disconnection - ai-providers/base.lua: use wait=true in parse_streaming_response output loop; on flush failure close upstream and return immediately --- apisix/plugin.lua | 28 ++++- apisix/plugins/ai-providers/base.lua | 28 ++++- t/plugin/ai-proxy-client-disconnect.t | 209 ++++++++++++++++++++++++++++++++++ 3 files changed, 256 insertions(+), 9 deletions(-) diff --git a/apisix/plugin.lua b/apisix/plugin.lua index 723ededd0..942efebe6 100644 --- a/apisix/plugin.lua +++ b/apisix/plugin.lua @@ -1397,13 +1397,22 @@ function _M.run_global_rules(api_ctx, global_rules, conf_version, phase_name) end end -function _M.lua_response_filter(api_ctx, headers, body) +-- @param wait boolean When true, use synchronous flush (ngx.flush(true)) and return +-- (ok, err) so callers can detect client disconnection. Defaults to false for +-- backward compatibility (async flush, no return value). +function _M.lua_response_filter(api_ctx, headers, body, wait) local plugins = api_ctx.plugins if not plugins or #plugins == 0 then -- if there is no any plugin, just print the original body to downstream - ngx_print(body) - ngx_flush() - return + local ok, err = ngx_print(body) + if not ok then + return false, err + end + ok, err = ngx_flush(wait) + if not ok then + return false, err + end + return true end for i = 1, #plugins, 2 do local phase_func = plugins[i]["lua_body_filter"] @@ -1430,8 +1439,15 @@ function _M.lua_response_filter(api_ctx, headers, body) ::CONTINUE:: end - ngx_print(body) - ngx_flush() + local ok, err = ngx_print(body) + if not ok then + return false, err + end + ok, err = ngx_flush(wait) + if not ok then + return false, err + end + return true end diff --git a/apisix/plugins/ai-providers/base.lua b/apisix/plugins/ai-providers/base.lua index db07dff70..8227922d6 100644 --- a/apisix/plugins/ai-providers/base.lua +++ b/apisix/plugins/ai-providers/base.lua @@ -352,14 +352,36 @@ function _M.parse_streaming_response(self, ctx, res, target_proto, converter) ::CONTINUE:: end - -- Output: converter events or passthrough raw chunk + -- Output: converter events or passthrough raw chunk. + -- Pass wait=true for synchronous flush so we can detect client disconnection. if converter then for _, c in ipairs(converted_chunks) do - plugin.lua_response_filter(ctx, res.headers, c) + local ok, flush_err = plugin.lua_response_filter(ctx, res.headers, c, true) output_sent = true + if not ok then + core.log.info("client disconnected during AI streaming, ", + "aborting upstream read: ", flush_err) + if res._httpc then + res._httpc:close() + res._httpc = nil + end + ctx.var.llm_request_done = true + return + end end else - plugin.lua_response_filter(ctx, res.headers, chunk) + local ok, flush_err = plugin.lua_response_filter(ctx, res.headers, chunk, true) + output_sent = true + if not ok then + core.log.info("client disconnected during AI streaming, ", + "aborting upstream read: ", flush_err) + if res._httpc then + res._httpc:close() + res._httpc = nil + end + ctx.var.llm_request_done = true + return + end end end end diff --git a/t/plugin/ai-proxy-client-disconnect.t b/t/plugin/ai-proxy-client-disconnect.t new file mode 100644 index 000000000..c9750927a --- /dev/null +++ b/t/plugin/ai-proxy-client-disconnect.t @@ -0,0 +1,209 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +use t::APISIX 'no_plan'; + +log_level("info"); +repeat_each(1); +no_long_string(); +no_root_location(); + + +add_block_preprocessor(sub { + my ($block) = @_; + + if (!defined $block->request) { + $block->set_value("request", "GET /t"); + } + + # Mock upstream: slow SSE server that streams chunks until the connection + # is closed, tracking the final chunk count in the "test" shared dict. + my $http_config = $block->http_config // <<_EOC_; + server { + server_name slow_openai_sse; + listen 7750; + + default_type 'text/event-stream'; + + location /v1/chat/completions { + content_by_lua_block { + ngx.header["Content-Type"] = "text/event-stream" + local dict = ngx.shared["test"] + dict:set("upstream_chunks", 0) + -- Stream up to 2000 chunks with 30ms sleep between each. + -- The proxy should abort well before this completes when + -- the client disconnects. + for i = 1, 2000 do + local ok, err = ngx.print( + 'data: {"id":"chatcmpl-1","object":' + .. '"chat.completion.chunk","choices":[{"delta":' + .. '{"content":"tok"},"index":0,' + .. '"finish_reason":null}],"usage":null}\\n\\n') + if not ok then + return + end + local flush_ok = ngx.flush(true) + if not flush_ok then + return + end + dict:set("upstream_chunks", i) + ngx.sleep(0.03) + end + } + } + + -- Probe endpoint to read the current chunk count. + location /chunks { + content_by_lua_block { + local dict = ngx.shared["test"] + ngx.say(dict:get("upstream_chunks") or 0) + } + } + } +_EOC_ + + $block->set_value("http_config", $http_config); +}); + + +run_tests(); + +__DATA__ + +=== TEST 1: set route for client disconnect test +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [[{ + "uri": "/anything", + "plugins": { + "ai-proxy": { + "provider": "openai", + "auth": { + "header": { + "Authorization": "Bearer token" + } + }, + "options": { + "model": "gpt-4", + "stream": true + }, + "override": { + "endpoint": "http://localhost:7750" + }, + "ssl_verify": false + } + } + }]] + ) + + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } +--- response_body +passed + + + +=== TEST 2: client disconnect aborts upstream read early +--- config + location /t { + content_by_lua_block { + local http = require("resty.http") + local httpc = http.new() + + local ok, err = httpc:connect({ + scheme = "http", + host = "localhost", + port = ngx.var.server_port, + }) + if not ok then + ngx.status = 500 + ngx.say("connect failed: ", err) + return + end + + local res, err = httpc:request({ + method = "POST", + headers = { ["Content-Type"] = "application/json" }, + path = "/anything", + body = [[{"messages": [{"role": "user", "content": "hi"}]}]], + }) + if not res then + ngx.status = 500 + ngx.say("request failed: ", err) + return + end + + -- Read exactly 3 chunks then close the connection abruptly. + for i = 1, 3 do + local chunk, rerr = res.body_reader() + if rerr or not chunk then + ngx.status = 500 + ngx.say("unexpected end of stream at chunk ", i, ": ", rerr) + return + end + end + httpc:close() + + -- Allow time for the proxy to detect the disconnect and stop + -- feeding the upstream connection, then capture the chunk count. + ngx.sleep(0.3) + + -- Read chunk count from the mock upstream's probe endpoint. + local probe = http.new() + ok, err = probe:connect({ scheme = "http", host = "localhost", port = 7750 }) + if not ok then + ngx.status = 500 + ngx.say("probe connect failed: ", err) + return + end + local probe_res, probe_err = probe:request({ + method = "GET", + path = "/chunks", + headers = { Host = "localhost" }, + }) + if not probe_res then + ngx.status = 500 + ngx.say("probe request failed: ", probe_err) + return + end + local count_str = probe_res:read_body() + probe:close() + + local count = tonumber(count_str) or 0 + -- Without the fix, the upstream would have produced hundreds of + -- chunks by now. With the fix it stops shortly after disconnect. + -- We expect well under 50 chunks after only 0.3s budget. + if count > 50 then + ngx.status = 500 + ngx.say("upstream was not aborted promptly, chunks: ", count) + return + end + ngx.say("ok, upstream aborted after ~", count, " chunks") + } + } +--- response_body_like +^ok, upstream aborted after ~\d+ chunks$ +--- error_log +client disconnected during AI streaming
