nic-6443 commented on code in PR #13254:
URL: https://github.com/apache/apisix/pull/13254#discussion_r3104766155
##########
apisix/plugin.lua:
##########
@@ -1397,13 +1397,22 @@ function _M.run_global_rules(api_ctx, global_rules,
conf_version, phase_name)
end
end
-function _M.lua_response_filter(api_ctx, headers, body)
+-- @param wait boolean When true, use synchronous flush (ngx.flush(true)) and
return
+-- (ok, err) so callers can detect client disconnection. Defaults to false
for
+-- backward compatibility (async flush, no return value).
Review Comment:
Fixed — updated the docstring to say the function always returns (ok, err)
regardless of wait.
##########
apisix/plugins/ai-providers/base.lua:
##########
@@ -352,14 +352,36 @@ function _M.parse_streaming_response(self, ctx, res,
target_proto, converter)
::CONTINUE::
end
- -- Output: converter events or passthrough raw chunk
+ -- Output: converter events or passthrough raw chunk.
+ -- Pass wait=true for synchronous flush so we can detect client
disconnection.
if converter then
for _, c in ipairs(converted_chunks) do
- plugin.lua_response_filter(ctx, res.headers, c)
+ local ok, flush_err = plugin.lua_response_filter(ctx,
res.headers, c, true)
output_sent = true
+ if not ok then
+ core.log.info("client disconnected during AI streaming, ",
+ "aborting upstream read: ", flush_err)
+ if res._httpc then
+ res._httpc:close()
+ res._httpc = nil
+ end
+ ctx.var.llm_request_done = true
+ return
+ end
end
else
- plugin.lua_response_filter(ctx, res.headers, chunk)
+ local ok, flush_err = plugin.lua_response_filter(ctx, res.headers,
chunk, true)
+ output_sent = true
+ if not ok then
+ core.log.info("client disconnected during AI streaming, ",
+ "aborting upstream read: ", flush_err)
+ if res._httpc then
+ res._httpc:close()
+ res._httpc = nil
+ end
+ ctx.var.llm_request_done = true
+ return
+ end
Review Comment:
Good call — extracted it into a local abort_on_disconnect(flush_err) helper
inside parse_streaming_response. Both branches now just call that and return.
##########
apisix/plugin.lua:
##########
@@ -1397,13 +1397,22 @@ function _M.run_global_rules(api_ctx, global_rules,
conf_version, phase_name)
end
end
-function _M.lua_response_filter(api_ctx, headers, body)
+-- @param wait boolean When true, use synchronous flush (ngx.flush(true)) and
return
+-- (ok, err) so callers can detect client disconnection. Defaults to false
for
+-- backward compatibility (async flush, no return value).
+function _M.lua_response_filter(api_ctx, headers, body, wait)
local plugins = api_ctx.plugins
if not plugins or #plugins == 0 then
-- if there is no any plugin, just print the original body to
downstream
- ngx_print(body)
- ngx_flush()
- return
+ local ok, err = ngx_print(body)
+ if not ok then
+ return false, err
+ end
+ ok, err = ngx_flush(wait)
+ if not ok then
Review Comment:
Fixed — now calls ngx_flush(true) when wait==true and ngx_flush() (no args)
otherwise, so nil is never passed through.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]