This is an automated email from the ASF dual-hosted git repository.

nic-6443 pushed a commit to branch fix/ai-proxy-client-disconnect
in repository https://gitbox.apache.org/repos/asf/apisix.git

commit 092f7dcef7be71a470cda8d28d95568ed95a5087
Author: Nic <[email protected]>
AuthorDate: Sat Apr 18 14:51:43 2026 +0800

    feat(ai-proxy): abort upstream read on client disconnect during streaming
    
    When a downstream client disconnects mid-stream, the proxy was continuing
    to read all remaining chunks from the LLM, performing SSE parsing, token
    counting, and protocol conversion unnecessarily.
    
    Fix by passing wait=true to lua_response_filter in the streaming path.
    ngx.flush(true) returns an error when the client connection is gone, at
    which point we close the upstream httpc connection and return early.
    
    Changes:
    - plugin.lua: add optional wait param to lua_response_filter; return
      (ok, err) so callers can detect client disconnection
    - ai-providers/base.lua: use wait=true in parse_streaming_response output
      loop; on flush failure close upstream and return immediately
---
 apisix/plugin.lua                     |  28 ++++-
 apisix/plugins/ai-providers/base.lua  |  28 ++++-
 t/plugin/ai-proxy-client-disconnect.t | 209 ++++++++++++++++++++++++++++++++++
 3 files changed, 256 insertions(+), 9 deletions(-)

diff --git a/apisix/plugin.lua b/apisix/plugin.lua
index 723ededd0..942efebe6 100644
--- a/apisix/plugin.lua
+++ b/apisix/plugin.lua
@@ -1397,13 +1397,22 @@ function _M.run_global_rules(api_ctx, global_rules, 
conf_version, phase_name)
     end
 end
 
-function _M.lua_response_filter(api_ctx, headers, body)
+-- @param wait boolean When true, use synchronous flush (ngx.flush(true)) and 
return
+--   (ok, err) so callers can detect client disconnection. Defaults to false 
for
+--   backward compatibility (async flush, no return value).
+function _M.lua_response_filter(api_ctx, headers, body, wait)
     local plugins = api_ctx.plugins
     if not plugins or #plugins == 0 then
         -- if there is no any plugin, just print the original body to 
downstream
-        ngx_print(body)
-        ngx_flush()
-        return
+        local ok, err = ngx_print(body)
+        if not ok then
+            return false, err
+        end
+        ok, err = ngx_flush(wait)
+        if not ok then
+            return false, err
+        end
+        return true
     end
     for i = 1, #plugins, 2 do
         local phase_func = plugins[i]["lua_body_filter"]
@@ -1430,8 +1439,15 @@ function _M.lua_response_filter(api_ctx, headers, body)
 
         ::CONTINUE::
     end
-    ngx_print(body)
-    ngx_flush()
+    local ok, err = ngx_print(body)
+    if not ok then
+        return false, err
+    end
+    ok, err = ngx_flush(wait)
+    if not ok then
+        return false, err
+    end
+    return true
 end
 
 
diff --git a/apisix/plugins/ai-providers/base.lua 
b/apisix/plugins/ai-providers/base.lua
index db07dff70..8227922d6 100644
--- a/apisix/plugins/ai-providers/base.lua
+++ b/apisix/plugins/ai-providers/base.lua
@@ -352,14 +352,36 @@ function _M.parse_streaming_response(self, ctx, res, 
target_proto, converter)
             ::CONTINUE::
         end
 
-        -- Output: converter events or passthrough raw chunk
+        -- Output: converter events or passthrough raw chunk.
+        -- Pass wait=true for synchronous flush so we can detect client 
disconnection.
         if converter then
             for _, c in ipairs(converted_chunks) do
-                plugin.lua_response_filter(ctx, res.headers, c)
+                local ok, flush_err = plugin.lua_response_filter(ctx, 
res.headers, c, true)
                 output_sent = true
+                if not ok then
+                    core.log.info("client disconnected during AI streaming, ",
+                                  "aborting upstream read: ", flush_err)
+                    if res._httpc then
+                        res._httpc:close()
+                        res._httpc = nil
+                    end
+                    ctx.var.llm_request_done = true
+                    return
+                end
             end
         else
-            plugin.lua_response_filter(ctx, res.headers, chunk)
+            local ok, flush_err = plugin.lua_response_filter(ctx, res.headers, 
chunk, true)
+            output_sent = true
+            if not ok then
+                core.log.info("client disconnected during AI streaming, ",
+                              "aborting upstream read: ", flush_err)
+                if res._httpc then
+                    res._httpc:close()
+                    res._httpc = nil
+                end
+                ctx.var.llm_request_done = true
+                return
+            end
         end
     end
 end
diff --git a/t/plugin/ai-proxy-client-disconnect.t 
b/t/plugin/ai-proxy-client-disconnect.t
new file mode 100644
index 000000000..c9750927a
--- /dev/null
+++ b/t/plugin/ai-proxy-client-disconnect.t
@@ -0,0 +1,209 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+use t::APISIX 'no_plan';
+
+log_level("info");
+repeat_each(1);
+no_long_string();
+no_root_location();
+
+
+add_block_preprocessor(sub {
+    my ($block) = @_;
+
+    if (!defined $block->request) {
+        $block->set_value("request", "GET /t");
+    }
+
+    # Mock upstream: slow SSE server that streams chunks until the connection
+    # is closed, tracking the final chunk count in the "test" shared dict.
+    my $http_config = $block->http_config // <<_EOC_;
+        server {
+            server_name slow_openai_sse;
+            listen 7750;
+
+            default_type 'text/event-stream';
+
+            location /v1/chat/completions {
+                content_by_lua_block {
+                    ngx.header["Content-Type"] = "text/event-stream"
+                    local dict = ngx.shared["test"]
+                    dict:set("upstream_chunks", 0)
+                    -- Stream up to 2000 chunks with 30ms sleep between each.
+                    -- The proxy should abort well before this completes when
+                    -- the client disconnects.
+                    for i = 1, 2000 do
+                        local ok, err = ngx.print(
+                            'data: {"id":"chatcmpl-1","object":'
+                            .. '"chat.completion.chunk","choices":[{"delta":'
+                            .. '{"content":"tok"},"index":0,'
+                            .. '"finish_reason":null}],"usage":null}\\n\\n')
+                        if not ok then
+                            return
+                        end
+                        local flush_ok = ngx.flush(true)
+                        if not flush_ok then
+                            return
+                        end
+                        dict:set("upstream_chunks", i)
+                        ngx.sleep(0.03)
+                    end
+                }
+            }
+
+            -- Probe endpoint to read the current chunk count.
+            location /chunks {
+                content_by_lua_block {
+                    local dict = ngx.shared["test"]
+                    ngx.say(dict:get("upstream_chunks") or 0)
+                }
+            }
+        }
+_EOC_
+
+    $block->set_value("http_config", $http_config);
+});
+
+
+run_tests();
+
+__DATA__
+
+=== TEST 1: set route for client disconnect test
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/routes/1',
+                 ngx.HTTP_PUT,
+                 [[{
+                    "uri": "/anything",
+                    "plugins": {
+                        "ai-proxy": {
+                            "provider": "openai",
+                            "auth": {
+                                "header": {
+                                    "Authorization": "Bearer token"
+                                }
+                            },
+                            "options": {
+                                "model": "gpt-4",
+                                "stream": true
+                            },
+                            "override": {
+                                "endpoint": "http://localhost:7750";
+                            },
+                            "ssl_verify": false
+                        }
+                    }
+                }]]
+            )
+
+            if code >= 300 then
+                ngx.status = code
+            end
+            ngx.say(body)
+        }
+    }
+--- response_body
+passed
+
+
+
+=== TEST 2: client disconnect aborts upstream read early
+--- config
+    location /t {
+        content_by_lua_block {
+            local http = require("resty.http")
+            local httpc = http.new()
+
+            local ok, err = httpc:connect({
+                scheme = "http",
+                host = "localhost",
+                port = ngx.var.server_port,
+            })
+            if not ok then
+                ngx.status = 500
+                ngx.say("connect failed: ", err)
+                return
+            end
+
+            local res, err = httpc:request({
+                method = "POST",
+                headers = { ["Content-Type"] = "application/json" },
+                path = "/anything",
+                body = [[{"messages": [{"role": "user", "content": "hi"}]}]],
+            })
+            if not res then
+                ngx.status = 500
+                ngx.say("request failed: ", err)
+                return
+            end
+
+            -- Read exactly 3 chunks then close the connection abruptly.
+            for i = 1, 3 do
+                local chunk, rerr = res.body_reader()
+                if rerr or not chunk then
+                    ngx.status = 500
+                    ngx.say("unexpected end of stream at chunk ", i, ": ", 
rerr)
+                    return
+                end
+            end
+            httpc:close()
+
+            -- Allow time for the proxy to detect the disconnect and stop
+            -- feeding the upstream connection, then capture the chunk count.
+            ngx.sleep(0.3)
+
+            -- Read chunk count from the mock upstream's probe endpoint.
+            local probe = http.new()
+            ok, err = probe:connect({ scheme = "http", host = "localhost", 
port = 7750 })
+            if not ok then
+                ngx.status = 500
+                ngx.say("probe connect failed: ", err)
+                return
+            end
+            local probe_res, probe_err = probe:request({
+                method = "GET",
+                path = "/chunks",
+                headers = { Host = "localhost" },
+            })
+            if not probe_res then
+                ngx.status = 500
+                ngx.say("probe request failed: ", probe_err)
+                return
+            end
+            local count_str = probe_res:read_body()
+            probe:close()
+
+            local count = tonumber(count_str) or 0
+            -- Without the fix, the upstream would have produced hundreds of
+            -- chunks by now. With the fix it stops shortly after disconnect.
+            -- We expect well under 50 chunks after only 0.3s budget.
+            if count > 50 then
+                ngx.status = 500
+                ngx.say("upstream was not aborted promptly, chunks: ", count)
+                return
+            end
+            ngx.say("ok, upstream aborted after ~", count, " chunks")
+        }
+    }
+--- response_body_like
+^ok, upstream aborted after ~\d+ chunks$
+--- error_log
+client disconnected during AI streaming

Reply via email to