nic-6443 commented on code in PR #13254:
URL: https://github.com/apache/apisix/pull/13254#discussion_r3104766155


##########
apisix/plugin.lua:
##########
@@ -1397,13 +1397,22 @@ function _M.run_global_rules(api_ctx, global_rules, 
conf_version, phase_name)
     end
 end
 
-function _M.lua_response_filter(api_ctx, headers, body)
+-- @param wait boolean When true, use synchronous flush (ngx.flush(true)) and 
return
+--   (ok, err) so callers can detect client disconnection. Defaults to false 
for
+--   backward compatibility (async flush, no return value).

Review Comment:
   Fixed — updated the docstring to say the function always returns (ok, err) 
regardless of wait.



##########
apisix/plugins/ai-providers/base.lua:
##########
@@ -352,14 +352,36 @@ function _M.parse_streaming_response(self, ctx, res, 
target_proto, converter)
             ::CONTINUE::
         end
 
-        -- Output: converter events or passthrough raw chunk
+        -- Output: converter events or passthrough raw chunk.
+        -- Pass wait=true for synchronous flush so we can detect client 
disconnection.
         if converter then
             for _, c in ipairs(converted_chunks) do
-                plugin.lua_response_filter(ctx, res.headers, c)
+                local ok, flush_err = plugin.lua_response_filter(ctx, 
res.headers, c, true)
                 output_sent = true
+                if not ok then
+                    core.log.info("client disconnected during AI streaming, ",
+                                  "aborting upstream read: ", flush_err)
+                    if res._httpc then
+                        res._httpc:close()
+                        res._httpc = nil
+                    end
+                    ctx.var.llm_request_done = true
+                    return
+                end
             end
         else
-            plugin.lua_response_filter(ctx, res.headers, chunk)
+            local ok, flush_err = plugin.lua_response_filter(ctx, res.headers, 
chunk, true)
+            output_sent = true
+            if not ok then
+                core.log.info("client disconnected during AI streaming, ",
+                              "aborting upstream read: ", flush_err)
+                if res._httpc then
+                    res._httpc:close()
+                    res._httpc = nil
+                end
+                ctx.var.llm_request_done = true
+                return
+            end

Review Comment:
   Good call — extracted it into a local abort_on_disconnect(flush_err) helper 
inside parse_streaming_response. Both branches now just call that and return.



##########
apisix/plugin.lua:
##########
@@ -1397,13 +1397,22 @@ function _M.run_global_rules(api_ctx, global_rules, 
conf_version, phase_name)
     end
 end
 
-function _M.lua_response_filter(api_ctx, headers, body)
+-- @param wait boolean When true, use synchronous flush (ngx.flush(true)) and 
return
+--   (ok, err) so callers can detect client disconnection. Defaults to false 
for
+--   backward compatibility (async flush, no return value).
+function _M.lua_response_filter(api_ctx, headers, body, wait)
     local plugins = api_ctx.plugins
     if not plugins or #plugins == 0 then
         -- if there is no any plugin, just print the original body to 
downstream
-        ngx_print(body)
-        ngx_flush()
-        return
+        local ok, err = ngx_print(body)
+        if not ok then
+            return false, err
+        end
+        ok, err = ngx_flush(wait)
+        if not ok then

Review Comment:
   Fixed — now calls ngx_flush(true) when wait==true and ngx_flush() (no args) 
otherwise, so nil is never passed through.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to