AlinsRan commented on code in PR #13606:
URL: https://github.com/apache/apisix/pull/13606#discussion_r3478841506
##########
apisix/plugins/ai-lakera-guard.lua:
##########
@@ -206,4 +221,76 @@ function _M.access(conf, ctx)
end
+function _M.lua_body_filter(conf, ctx, headers, body)
+ if conf.direction ~= "output" and conf.direction ~= "both" then
+ return
+ end
+
+ if ngx.status >= 400 then
+ return
+ end
+
+ -- Non-streaming: ai-proxy hands us the fully-assembled completion text.
+ if ctx.var.request_type == "ai_chat" then
+ local text = ctx.var.llm_response_text
+ if not text or text == "" then
+ return
+ end
+ return moderate_response(ctx, conf, text)
+ end
+
+ if ctx.var.request_type == "ai_stream" then
+ -- alert (shadow) mode non-blocking
+ if conf.action == "alert" then
+ if ctx.var.llm_request_done then
+ local text = ctx.var.llm_response_text
+ if text and text ~= "" then
+ moderate_response(ctx, conf, text)
+ end
+ end
+ return
+ end
+
+ -- block mode
+ local buffer = ctx.lakera_response_buffer
+ if not buffer then
+ buffer = {}
+ ctx.lakera_response_buffer = buffer
+ end
+ buffer[#buffer + 1] = body or ""
+
+ if not ctx.var.llm_request_done then
+ -- Withhold this chunk until end-of-stream, replacing it with an
SSE
+ -- keep-alive comment. Not "" (nginx treats an empty body as
nothing
+ -- to flush) and not nil (which would let the original chunk reach
+ -- the client) -- the keep-alive holds the content back while
keeping
+ -- the connection open.
+ return nil, ":\n\n"
+ end
+
+ local text = ctx.var.llm_response_text
+ if not text or text == "" then
+ if conf.fail_open then
+ core.log.warn("ai-lakera-guard: streamed response ended
without ",
+ "an assembled completion (no upstream usage
event?); ",
+ "fail_open=true, releasing unscanned")
+ return nil, concat(buffer)
+ end
+ core.log.error("ai-lakera-guard: streamed response ended without ",
+ "an assembled completion (no upstream usage
event?); ",
+ "fail_open=false, blocking response")
+ return ngx.OK, deny_message(ctx, conf,
conf.response_failure_message)
+ end
+
+ local code, message = moderate_response(ctx, conf, text)
+ if code then
+ return ngx.OK, message
+ end
+
+ -- Clean: release the buffered stream verbatim, preserving SSE framing.
+ return nil, concat(buffer)
Review Comment:
**Block-mode streaming can duplicate (or strand) the response when a
protocol converter is active**
The release logic here keys "stream finished, flush the buffer" solely on
`ctx.var.llm_request_done`. That holds for the same-protocol path exercised by
the tests in this PR, where `parse_streaming_response` dispatches one
`lua_body_filter` call per upstream chunk and `[DONE]` is the last one. But
when a converter is active (cross-protocol, e.g. Anthropic client → OpenAI
upstream via `anthropic-messages-to-openai-chat`, reachable through
`ai-proxy-multi`), the assumption breaks:
- In `apisix/plugins/ai-providers/base.lua`, `ctx.var.llm_request_done` is
set while *parsing* the events of an upstream chunk (on `parsed.type ==
"done"`), i.e. **before** the dispatch loop; the dispatch loop then calls
`lua_body_filter` **once per converted chunk**.
- On `[DONE]`, `anthropic-messages-to-openai-chat.convert_sse_events`
flushes **two** events in a single call — `message_delta` + `message_stop`. So
the terminal upstream chunk yields 2 converted chunks, both dispatched with
`llm_request_done == true`.
**Variant A (upstream without `include_usage`):** the 1st converted chunk
scans and `return nil, concat(buffer)` releases the whole buffer; the 2nd
converted chunk does it **again** → the response body is delivered twice and
Lakera is scanned twice.
**Variant B (upstream with `include_usage`):** the converter defers
`message_delta`/`message_stop` to the trailing usage-only chunk, whose
`parsed.type == "usage"` does **not** set `llm_request_done`, so those events
get buffered; then `[DONE]`'s `convert_sse_events` returns `nil` (nothing to
dispatch) → the buffer is **never** released and the normal EOF path doesn't
run a filter pass → the client receives only keep-alives and the clean response
is dropped.
Suggested fix: gate the scan+release behind a one-shot flag (e.g.
`ctx.lakera_response_released`) so it fires exactly once, and cover the
"`llm_request_done` already set but this isn't the final dispatch" and "buffer
still unreleased at EOF" cases (the latter may need an end-of-stream filter
pass in `base.lua`, mirroring the abort path added in this PR). A regression
test with `ai-proxy-multi` (Anthropic client + OpenAI streaming upstream) in
block mode would lock this down.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]