This is an automated email from the ASF dual-hosted git repository.

Baoyuantop pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix.git


The following commit(s) were added to refs/heads/master by this push:
     new ecbb6fe89 feat(ai-proxy): add max_stream_duration_ms and 
max_response_bytes safeguards (#13250)
ecbb6fe89 is described below

commit ecbb6fe89f96ce0b371964dd057d2cb17e665784
Author: Nic <[email protected]>
AuthorDate: Mon Apr 20 10:42:51 2026 +0800

    feat(ai-proxy): add max_stream_duration_ms and max_response_bytes 
safeguards (#13250)
---
 apisix/plugins/ai-providers/base.lua     | 118 ++++++++-
 apisix/plugins/ai-proxy/base.lua         |  10 +-
 apisix/plugins/ai-proxy/schema.lua       |  32 +++
 docs/en/latest/plugins/ai-proxy-multi.md |   4 +-
 docs/en/latest/plugins/ai-proxy.md       |   4 +-
 docs/zh/latest/plugins/ai-proxy-multi.md |   4 +-
 docs/zh/latest/plugins/ai-proxy.md       |   4 +-
 t/plugin/ai-proxy-stream-limits.t        | 431 +++++++++++++++++++++++++++++++
 8 files changed, 595 insertions(+), 12 deletions(-)

diff --git a/apisix/plugins/ai-providers/base.lua 
b/apisix/plugins/ai-providers/base.lua
index db07dff70..cb49263f5 100644
--- a/apisix/plugins/ai-providers/base.lua
+++ b/apisix/plugins/ai-providers/base.lua
@@ -38,6 +38,7 @@ local log_sanitize = require("apisix.utils.log-sanitize")
 local protocols = require("apisix.plugins.ai-protocols")
 local ngx = ngx
 local ngx_now = ngx.now
+local tonumber = tonumber
 
 local table = table
 local pairs = pairs
@@ -196,11 +197,70 @@ end
 -- using the client protocol module.
 -- @param client_proto table The protocol module for the client's protocol
 -- @param converter table|nil The converter module (if protocol conversion 
needed)
+-- @param conf table|nil Plugin configuration (used for response size limits)
 -- @return table|nil Parsed and optionally converted response body
 -- @return string|nil Error
-function _M.parse_response(self, ctx, res, client_proto, converter)
+function _M.parse_response(self, ctx, res, client_proto, converter, conf)
     local headers = res.headers
-    local raw_res_body, err = res:read_body()
+
+    -- Pre-check Content-Length against max_response_bytes when the upstream
+    -- advertises it. For responses without Content-Length (chunked), we read
+    -- the body in bounded chunks below and enforce the cap incrementally.
+    local max_bytes = conf and conf.max_response_bytes
+    if max_bytes then
+        local content_length = tonumber(headers["Content-Length"])
+        if content_length and content_length > max_bytes then
+            core.log.warn("aborting AI response: Content-Length ", 
content_length,
+                          " exceeds max_response_bytes ", max_bytes)
+            if res._httpc then
+                res._httpc:close()
+                res._httpc = nil
+            end
+            return nil, "max_response_bytes exceeded", 502
+        end
+    end
+
+    local raw_res_body, err
+    if max_bytes then
+        -- Read in chunks so a runaway chunked upstream cannot force the
+        -- worker to buffer arbitrarily many bytes before the cap trips.
+        local body_reader = res.body_reader
+        if not body_reader then
+            -- Defensive: if no reader, fall back to read_body() and accept
+            -- that the cap is only post-facto for this path.
+            raw_res_body, err = res:read_body()
+        else
+            local parts = {}
+            local total = 0
+            while true do
+                local chunk, read_err = body_reader()
+                if read_err then
+                    err = read_err
+                    break
+                end
+                if not chunk then
+                    break
+                end
+                total = total + #chunk
+                if total > max_bytes then
+                    core.log.warn("aborting AI response: body size exceeds",
+                                  " max_response_bytes ", max_bytes,
+                                  " (read ", total, " bytes)")
+                    if res._httpc then
+                        res._httpc:close()
+                        res._httpc = nil
+                    end
+                    return nil, "max_response_bytes exceeded", 502
+                end
+                parts[#parts + 1] = chunk
+            end
+            if not err then
+                raw_res_body = table.concat(parts)
+            end
+        end
+    else
+        raw_res_body, err = res:read_body()
+    end
     if not raw_res_body then
         core.log.warn("failed to read response body: ", err)
         return nil, err
@@ -261,7 +321,8 @@ end
 -- transforming events to client format.
 -- @param target_proto table The protocol module for the provider's native 
protocol
 -- @param converter table|nil The converter module (if protocol conversion 
needed)
-function _M.parse_streaming_response(self, ctx, res, target_proto, converter)
+-- @param conf table|nil Plugin configuration (used for stream duration and 
size limits)
+function _M.parse_streaming_response(self, ctx, res, target_proto, converter, 
conf)
     local body_reader = res.body_reader
     local contents = {}
     local sse_state = { is_first = true }
@@ -272,6 +333,15 @@ function _M.parse_streaming_response(self, ctx, res, 
target_proto, converter)
     -- uncommitted and causing nginx to fall through to the balancer phase.
     local output_sent = false
 
+    -- Runaway-upstream safeguards. Both are opt-in; unset means no cap.
+    local max_duration_ms = conf and conf.max_stream_duration_ms
+    local max_bytes = conf and conf.max_response_bytes
+    local deadline
+    if max_duration_ms then
+        deadline = ctx.llm_request_start_time + max_duration_ms / 1000
+    end
+    local bytes_read = 0
+
     while true do
         local chunk, err = body_reader()
         ctx.var.apisix_upstream_response_time = math.floor((ngx_now() -
@@ -296,6 +366,8 @@ function _M.parse_streaming_response(self, ctx, res, 
target_proto, converter)
             return
         end
 
+        bytes_read = bytes_read + #chunk
+
         if ctx.var.llm_time_to_first_token == "0" then
             ctx.var.llm_time_to_first_token = math.floor(
                                             (ngx_now() - 
ctx.llm_request_start_time) * 1000)
@@ -360,6 +432,46 @@ function _M.parse_streaming_response(self, ctx, res, 
target_proto, converter)
             end
         else
             plugin.lua_response_filter(ctx, res.headers, chunk)
+            output_sent = true
+        end
+
+        -- Enforce runaway-upstream safeguards after processing the chunk.
+        -- Checked post-flush so clients still see any bytes we already 
emitted.
+        local limit_hit
+        if deadline and ngx_now() >= deadline then
+            limit_hit = "max_stream_duration_ms"
+        elseif max_bytes and bytes_read > max_bytes then
+            limit_hit = "max_response_bytes"
+        end
+        if limit_hit then
+            local duration_ms = math.floor((ngx_now() -
+                                            ctx.llm_request_start_time) * 1000)
+            core.log.warn("aborting AI stream: ", limit_hit, " exceeded;",
+                          " bytes=", bytes_read,
+                          " duration_ms=", duration_ms,
+                          " route_id=", ctx.var.route_id or "")
+            -- Force-close upstream so we don't pool a half-drained connection.
+            if res._httpc then
+                res._httpc:close()
+                res._httpc = nil
+            end
+            -- Signal downstream filters (e.g. moderation plugins that defer
+            -- work until request completion) that no more content is coming.
+            ctx.var.llm_request_done = true
+            if output_sent then
+                -- Client has already received partial SSE; stop feeding 
chunks.
+                -- nginx will close the downstream connection at end of content
+                -- phase. Clients detect incomplete responses via the absence
+                -- of a protocol-specific terminator (e.g. OpenAI [DONE],
+                -- Anthropic message_stop, Responses response.completed).
+                return
+            end
+            -- No bytes flushed yet (e.g. converter skipped all events so far).
+            -- Surface as 504 for duration (timeout-like) or 502 for size-limit
+            -- (bad gateway response), so on_error / fallback policies can
+            -- distinguish the failure modes.
+            local status = limit_hit == "max_stream_duration_ms" and 504 or 502
+            return status, limit_hit .. " exceeded"
         end
     end
 end
diff --git a/apisix/plugins/ai-proxy/base.lua b/apisix/plugins/ai-proxy/base.lua
index c893c4ec4..5ce2658e4 100644
--- a/apisix/plugins/ai-proxy/base.lua
+++ b/apisix/plugins/ai-proxy/base.lua
@@ -227,13 +227,13 @@ function _M.before_proxy(conf, ctx, on_error)
                     core.log.error("no protocol module for streaming target: 
", target_proto)
                     return 500
                 end
-                code = ai_provider:parse_streaming_response(
-                    ctx, res, target_proto_module, converter)
+                code, body = ai_provider:parse_streaming_response(
+                    ctx, res, target_proto_module, converter, conf)
             else
-                local _, parse_err = ai_provider:parse_response(
-                    ctx, res, client_proto, converter)
+                local _, parse_err, parse_status = ai_provider:parse_response(
+                    ctx, res, client_proto, converter, conf)
                 if parse_err then
-                    code = 500
+                    code = parse_status or 500
                     body = parse_err
                 end
             end
diff --git a/apisix/plugins/ai-proxy/schema.lua 
b/apisix/plugins/ai-proxy/schema.lua
index 397f210f3..6c9dc7037 100644
--- a/apisix/plugins/ai-proxy/schema.lua
+++ b/apisix/plugins/ai-proxy/schema.lua
@@ -183,6 +183,22 @@ _M.ai_proxy_schema = {
             default = 30000,
             description = "timeout in milliseconds",
         },
+        max_stream_duration_ms = {
+            type = "integer",
+            minimum = 1,
+            description = "Maximum wall-clock duration (in milliseconds) for a 
"
+                       .. "streaming AI response. If the upstream keeps 
sending "
+                       .. "data past this deadline, the connection is closed. "
+                       .. "Unset means no cap. Use this to protect the gateway 
"
+                       .. "from upstream bugs that produce tokens 
indefinitely.",
+        },
+        max_response_bytes = {
+            type = "integer",
+            minimum = 1,
+            description = "Maximum total bytes read from the upstream for a "
+                       .. "single AI response (streaming or non-streaming). If 
"
+                       .. "exceeded, the connection is closed. Unset means no 
cap.",
+        },
         keepalive = {type = "boolean", default = true},
         keepalive_timeout = {
             type = "integer",
@@ -258,6 +274,22 @@ _M.ai_proxy_multi_schema = {
             default = 30000,
             description = "timeout in milliseconds",
         },
+        max_stream_duration_ms = {
+            type = "integer",
+            minimum = 1,
+            description = "Maximum wall-clock duration (in milliseconds) for a 
"
+                       .. "streaming AI response. If the upstream keeps 
sending "
+                       .. "data past this deadline, the connection is closed. "
+                       .. "Unset means no cap. Use this to protect the gateway 
"
+                       .. "from upstream bugs that produce tokens 
indefinitely.",
+        },
+        max_response_bytes = {
+            type = "integer",
+            minimum = 1,
+            description = "Maximum total bytes read from the upstream for a "
+                       .. "single AI response (streaming or non-streaming). If 
"
+                       .. "exceeded, the connection is closed. Unset means no 
cap.",
+        },
         keepalive = {type = "boolean", default = true},
         keepalive_timeout = {
             type = "integer",
diff --git a/docs/en/latest/plugins/ai-proxy-multi.md 
b/docs/en/latest/plugins/ai-proxy-multi.md
index 2a71760b3..86f3ba9fc 100644
--- a/docs/en/latest/plugins/ai-proxy-multi.md
+++ b/docs/en/latest/plugins/ai-proxy-multi.md
@@ -99,7 +99,9 @@ In addition, the Plugin also supports logging LLM request 
information in the acc
 | instances.checks.active.unhealthy.http_statuses | array[integer] | False  | 
[429,404,500,501,502,503,504,505] | status code between 200 and 599 inclusive | 
An array of HTTP status codes that defines an unhealthy node. |
 | instances.checks.active.unhealthy.http_failures | integer      | False    | 
5                               | between 1 and 254 inclusive | Number of HTTP 
failures to define an unhealthy node. |
 | instances.checks.active.unhealthy.timeout     | integer        | False    | 
3                               | between 1 and 254 inclusive | Number of probe 
timeouts to define an unhealthy node. |
-| timeout                             | integer        | False    | 30000      
                     | greater than or equal to 1 | Request timeout in 
milliseconds when requesting the LLM service. |
+| timeout                             | integer        | False    | 30000      
                     | greater than or equal to 1 | Request timeout in 
milliseconds when requesting the LLM service. Applied per socket operation 
(connect / send / read block); does not cap the total duration of a streaming 
response. |
+| max_stream_duration_ms              | integer        | False    |            
                     | greater than or equal to 1 | Maximum wall-clock duration 
(in milliseconds) for a streaming AI response. If the upstream keeps sending 
data past this deadline, the gateway closes the connection. Unset means no cap. 
Use this to protect the gateway from upstream bugs that produce tokens 
indefinitely. When the limit is hit mid-stream, the downstream SSE stream is 
truncated (no protocol-speci [...]
+| max_response_bytes                  | integer        | False    |            
                     | greater than or equal to 1 | Maximum total bytes read 
from the upstream for a single AI response (streaming or non-streaming). If 
exceeded, the gateway closes the connection. For non-streaming responses with 
`Content-Length`, the check is performed before reading the body; for chunked 
(no-`Content-Length`) non-streaming responses and for streaming responses, the 
cap is enforced increment [...]
 | keepalive                           | boolean        | False    | true       
                     |              | If true, keep the connection alive when 
requesting the LLM service. |
 | keepalive_timeout                   | integer        | False    | 60000      
                     | greater than or equal to 1000 | Request timeout in 
milliseconds when requesting the LLM service. |
 | keepalive_pool                      | integer        | False    | 30         
                     |              | Keepalive pool size for when connecting 
with the LLM service. |
diff --git a/docs/en/latest/plugins/ai-proxy.md 
b/docs/en/latest/plugins/ai-proxy.md
index dab9f784e..b2aed0c0a 100644
--- a/docs/en/latest/plugins/ai-proxy.md
+++ b/docs/en/latest/plugins/ai-proxy.md
@@ -69,7 +69,9 @@ In addition, the Plugin also supports logging LLM request 
information in the acc
 | logging        | object  | False    |         |                              
            | Logging configurations. Does not affect `error.log`. |
 | logging.summaries | boolean | False | false |                                
          | If true, logs request LLM model, duration, request, and response 
tokens. |
 | logging.payloads  | boolean | False | false |                                
          | If true, logs request and response payload. |
-| timeout        | integer | False    | 30000    | ≥ 1                         
             | Request timeout in milliseconds when requesting the LLM service. 
|
+| timeout        | integer | False    | 30000    | ≥ 1                         
             | Request timeout in milliseconds when requesting the LLM service. 
Applied per socket operation (connect / send / read block); does not cap the 
total duration of a streaming response. |
+| max_stream_duration_ms | integer | False |        | ≥ 1                      
                | Maximum wall-clock duration (in milliseconds) for a streaming 
AI response. If the upstream keeps sending data past this deadline, the gateway 
closes the connection. Unset means no cap. Use this to protect the gateway from 
upstream bugs that produce tokens indefinitely. When the limit is hit 
mid-stream, the downstream SSE stream is truncated (no protocol-specific 
terminator such as `[DONE]`, ` [...]
+| max_response_bytes     | integer | False |        | ≥ 1                      
                | Maximum total bytes read from the upstream for a single AI 
response (streaming or non-streaming). If exceeded, the gateway closes the 
connection. For non-streaming responses with `Content-Length`, the check is 
performed before reading the body; for chunked (no-`Content-Length`) 
non-streaming responses and for streaming responses, the cap is enforced 
incrementally as bytes are received. Unset  [...]
 | keepalive      | boolean | False    | true   |                               
           | If true, keeps the connection alive when requesting the LLM 
service. |
 | keepalive_timeout | integer | False | 60000  | ≥ 1000                        
           | Keepalive timeout in milliseconds when connecting to the LLM 
service. |
 | keepalive_pool | integer | False    | 30       |                             
             | Keepalive pool size for the LLM service connection. |
diff --git a/docs/zh/latest/plugins/ai-proxy-multi.md 
b/docs/zh/latest/plugins/ai-proxy-multi.md
index a764d7d11..d7f57d5df 100644
--- a/docs/zh/latest/plugins/ai-proxy-multi.md
+++ b/docs/zh/latest/plugins/ai-proxy-multi.md
@@ -101,7 +101,9 @@ import TabItem from '@theme/TabItem';
 | instances.checks.active.unhealthy.http_statuses | array[integer] | 否  | 
[429,404,500,501,502,503,504,505] | 200 到 599 之间的状态码(包含) | 定义不健康节点的 HTTP 状态码数组。 
|
 | instances.checks.active.unhealthy.http_failures | integer      | 否    | 5    
                           | 1 到 254(包含) | 定义不健康节点的 HTTP 失败次数。 |
 | instances.checks.active.unhealthy.timeout     | integer        | 否    | 3    
                           | 1 到 254(包含) | 定义不健康节点的探测超时次数。 |
-| timeout                             | integer        | 否    | 30000          
                 | 大于或等于 1 | 请求 LLM 服务时的请求超时时间(毫秒)。 |
+| timeout                             | integer        | 否    | 30000          
                 | 大于或等于 1 | 请求 LLM 服务时的请求超时时间(毫秒)。应用于单次 socket 操作(连接 / 发送 / 
读取块),不限制流式响应的总时长。 |
+| max_stream_duration_ms              | integer        | 否    |                
                 | 大于或等于 1 | 流式 AI 
响应的总墙钟时长上限(毫秒)。若上游在此时间后仍持续发送数据,网关将关闭连接。未设置时不限制。用于防护上游持续输出 token 导致网关 CPU 
被打满的异常情况。中途触发上限时,下游 SSE 流会被截断(不再发送协议特定的终止标记,例如 `[DONE]`、`message_stop` 或 
`response.completed`),客户端应将缺失的终止标记视为响应未完成。 |
+| max_response_bytes                  | integer        | 否    |                
                 | 大于或等于 1 | 单次 AI 响应(流式或非流式)允许从上游读取的最大总字节数。超出时关闭连接。非流式响应若存在 
`Content-Length`,在读取 body 之前预检;否则(chunked 传输)与流式响应一样在接收字节的过程中增量检查。未设置时不限制。 |
 | keepalive                           | boolean        | 否    | true           
                 |              | 如果为 true,在请求 LLM 服务时保持连接活跃。 |
 | keepalive_timeout                   | integer        | 否    | 60000          
                 | 大于或等于 1000 | 请求 LLM 服务时的请求超时时间(毫秒)。 |
 | keepalive_pool                      | integer        | 否    | 30             
                 |              | 连接 LLM 服务时的保活池大小。 |
diff --git a/docs/zh/latest/plugins/ai-proxy.md 
b/docs/zh/latest/plugins/ai-proxy.md
index ebf27332b..3d2354ada 100644
--- a/docs/zh/latest/plugins/ai-proxy.md
+++ b/docs/zh/latest/plugins/ai-proxy.md
@@ -69,7 +69,9 @@ description: ai-proxy 插件通过将插件配置转换为所需的请求格式
 | logging        | object  | 否    |         |                                  
        | 日志配置。不影响 `error.log`。 |
 | logging.summaries | boolean | 否 | false |                                    
      | 如果为 true,记录请求 LLM 模型、持续时间、请求和响应令牌。 |
 | logging.payloads  | boolean | 否 | false |                                    
      | 如果为 true,记录请求和响应负载。 |
-| timeout        | integer | 否    | 30000    | ≥ 1                             
         | 请求 LLM 服务时的请求超时时间(毫秒)。 |
+| timeout        | integer | 否    | 30000    | ≥ 1                             
         | 请求 LLM 服务时的请求超时时间(毫秒)。应用于单次 socket 操作(连接 / 发送 / 读取块),不限制流式响应的总时长。 |
+| max_stream_duration_ms | integer | 否 |        | ≥ 1                          
            | 流式 AI 响应的总墙钟时长上限(毫秒)。若上游在此时间后仍持续发送数据,网关将关闭连接。未设置时不限制。用于防护上游持续输出 
token 导致网关 CPU 被打满的异常情况。中途触发上限时,下游 SSE 流会被截断(不再发送协议特定的终止标记,例如 
`[DONE]`、`message_stop` 或 `response.completed`),客户端应将缺失的终止标记视为响应未完成。 |
+| max_response_bytes     | integer | 否 |        | ≥ 1                          
            | 单次 AI 响应(流式或非流式)允许从上游读取的最大总字节数。超出时关闭连接。非流式响应若存在 
`Content-Length`,在读取 body 之前预检;否则(chunked 传输)与流式响应一样在接收字节的过程中增量检查。未设置时不限制。 |
 | keepalive      | boolean | 否    | true   |                                   
       | 如果为 true,在请求 LLM 服务时保持连接活跃。 |
 | keepalive_timeout | integer | 否 | 60000  | ≥ 1000                            
       | 连接到 LLM 服务时的保活超时时间(毫秒)。 |
 | keepalive_pool | integer | 否    | 30       |                                 
         | LLM 服务连接的保活池大小。 |
diff --git a/t/plugin/ai-proxy-stream-limits.t 
b/t/plugin/ai-proxy-stream-limits.t
new file mode 100644
index 000000000..6173e9dba
--- /dev/null
+++ b/t/plugin/ai-proxy-stream-limits.t
@@ -0,0 +1,431 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+use t::APISIX 'no_plan';
+
+log_level("info");
+repeat_each(1);
+no_long_string();
+no_root_location();
+
+
+add_block_preprocessor(sub {
+    my ($block) = @_;
+
+    if (!defined $block->request) {
+        $block->set_value("request", "GET /t");
+    }
+
+    # Shared mock upstream: a runaway SSE server that streams OpenAI chat
+    # completion chunks indefinitely and never sends "[DONE]".
+    my $http_config = $block->http_config // <<_EOC_;
+        server {
+            server_name runaway_openai_sse;
+            listen 7740;
+
+            default_type 'text/event-stream';
+
+            location /v1/chat/completions {
+                content_by_lua_block {
+                    ngx.header["Content-Type"] = "text/event-stream"
+                    -- Bound by an upper limit so test-nginx never hangs even
+                    -- if the plugin safeguard misfires; in practice the plugin
+                    -- should abort long before this completes.
+                    for i = 1, 10000 do
+                        ngx.print('data: {"id":"chatcmpl-1","object":'
+                            .. '"chat.completion.chunk","choices":[{"delta":'
+                            .. '{"content":"token"},"index":0,'
+                            .. '"finish_reason":null}],"usage":null}\\n\\n')
+                        ngx.flush(true)
+                        ngx.sleep(0.01)
+                    end
+                    -- Deliberately never send [DONE].
+                }
+            }
+
+            location /v1/oversized {
+                content_by_lua_block {
+                    -- Advertise a large Content-Length to trigger the
+                    -- non-streaming max_response_bytes pre-check.
+                    ngx.header["Content-Type"] = "application/json"
+                    ngx.header["Content-Length"] = "100000"
+                    ngx.print(string.rep("x", 100000))
+                }
+            }
+
+            location /v1/oversized_chunked {
+                content_by_lua_block {
+                    -- No Content-Length; chunked transfer. Exercises the
+                    -- incremental body_reader enforcement in parse_response.
+                    ngx.header["Content-Type"] = "application/json"
+                    -- Write in chunks so nginx uses chunked transfer-encoding.
+                    for i = 1, 10 do
+                        ngx.print(string.rep("x", 10000))
+                        ngx.flush(true)
+                    end
+                }
+            }
+        }
+_EOC_
+
+    $block->set_value("http_config", $http_config);
+});
+
+
+run_tests();
+
+__DATA__
+
+=== TEST 1: set route with max_stream_duration_ms against runaway SSE upstream
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/routes/1',
+                 ngx.HTTP_PUT,
+                 [[{
+                    "uri": "/anything",
+                    "plugins": {
+                        "ai-proxy": {
+                            "provider": "openai",
+                            "auth": {
+                                "header": {
+                                    "Authorization": "Bearer token"
+                                }
+                            },
+                            "options": {
+                                "model": "gpt-3.5-turbo",
+                                "stream": true
+                            },
+                            "max_stream_duration_ms": 500,
+                            "override": {
+                                "endpoint": "http://localhost:7740";
+                            },
+                            "ssl_verify": false
+                        }
+                    }
+                }]]
+            )
+
+            if code >= 300 then
+                ngx.status = code
+            end
+            ngx.say(body)
+        }
+    }
+--- response_body
+passed
+
+
+
+=== TEST 2: max_stream_duration_ms aborts the stream within budget
+--- config
+    location /t {
+        content_by_lua_block {
+            local http = require("resty.http")
+            local httpc = http.new()
+
+            local ok, err = httpc:connect({
+                scheme = "http",
+                host = "localhost",
+                port = ngx.var.server_port,
+            })
+            if not ok then
+                ngx.status = 500
+                ngx.say(err)
+                return
+            end
+
+            local start = ngx.now()
+            local res, err = httpc:request({
+                method = "POST",
+                headers = { ["Content-Type"] = "application/json" },
+                path = "/anything",
+                body = [[{"messages": [{"role": "user", "content": "hi"}]}]],
+            })
+            if not res then
+                ngx.status = 500
+                ngx.say(err)
+                return
+            end
+
+            -- Drain whatever the gateway is willing to send; it must end
+            -- within a few seconds because the plugin will close the
+            -- upstream after ~500ms.
+            local chunks = 0
+            while true do
+                local chunk, rerr = res.body_reader()
+                if rerr or not chunk then break end
+                chunks = chunks + 1
+            end
+            local elapsed = ngx.now() - start
+
+            -- The test mock would run for ~100s without the safeguard;
+            -- with the safeguard the whole exchange must finish in well
+            -- under 5s.
+            if elapsed >= 5 then
+                ngx.status = 500
+                ngx.say("stream did not abort in time: ", elapsed, "s")
+                return
+            end
+            if chunks == 0 then
+                ngx.status = 500
+                ngx.say("no chunks received")
+                return
+            end
+            ngx.say("ok")
+        }
+    }
+--- response_body
+ok
+--- error_log
+aborting AI stream: max_stream_duration_ms exceeded
+
+
+
+=== TEST 3: set route with max_response_bytes against runaway SSE upstream
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/routes/1',
+                 ngx.HTTP_PUT,
+                 [[{
+                    "uri": "/anything",
+                    "plugins": {
+                        "ai-proxy": {
+                            "provider": "openai",
+                            "auth": {
+                                "header": {
+                                    "Authorization": "Bearer token"
+                                }
+                            },
+                            "options": {
+                                "model": "gpt-3.5-turbo",
+                                "stream": true
+                            },
+                            "max_response_bytes": 2048,
+                            "override": {
+                                "endpoint": "http://localhost:7740";
+                            },
+                            "ssl_verify": false
+                        }
+                    }
+                }]]
+            )
+
+            if code >= 300 then
+                ngx.status = code
+            end
+            ngx.say(body)
+        }
+    }
+--- response_body
+passed
+
+
+
+=== TEST 4: max_response_bytes aborts the stream after byte budget
+--- config
+    location /t {
+        content_by_lua_block {
+            local http = require("resty.http")
+            local httpc = http.new()
+
+            local ok, err = httpc:connect({
+                scheme = "http",
+                host = "localhost",
+                port = ngx.var.server_port,
+            })
+            if not ok then
+                ngx.status = 500
+                ngx.say(err)
+                return
+            end
+
+            local res, err = httpc:request({
+                method = "POST",
+                headers = { ["Content-Type"] = "application/json" },
+                path = "/anything",
+                body = [[{"messages": [{"role": "user", "content": "hi"}]}]],
+            })
+            if not res then
+                ngx.status = 500
+                ngx.say(err)
+                return
+            end
+
+            local total = 0
+            local start = ngx.now()
+            while true do
+                local chunk, rerr = res.body_reader()
+                if rerr or not chunk then break end
+                total = total + #chunk
+            end
+            local elapsed = ngx.now() - start
+
+            if elapsed >= 5 then
+                ngx.status = 500
+                ngx.say("stream did not abort in time: ", elapsed, "s")
+                return
+            end
+            if total == 0 then
+                ngx.status = 500
+                ngx.say("no bytes received")
+                return
+            end
+            ngx.say("ok")
+        }
+    }
+--- response_body
+ok
+--- error_log
+aborting AI stream: max_response_bytes exceeded
+
+
+
+=== TEST 5: set route for non-streaming oversized Content-Length
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/routes/1',
+                 ngx.HTTP_PUT,
+                 [[{
+                    "uri": "/anything",
+                    "plugins": {
+                        "ai-proxy": {
+                            "provider": "openai",
+                            "auth": {
+                                "header": {
+                                    "Authorization": "Bearer token"
+                                }
+                            },
+                            "options": {
+                                "model": "gpt-3.5-turbo"
+                            },
+                            "max_response_bytes": 1024,
+                            "override": {
+                                "endpoint": 
"http://localhost:7740/v1/oversized";
+                            },
+                            "ssl_verify": false
+                        }
+                    }
+                }]]
+            )
+
+            if code >= 300 then
+                ngx.status = code
+            end
+            ngx.say(body)
+        }
+    }
+--- response_body
+passed
+
+
+
+=== TEST 6: non-streaming response with oversized Content-Length is rejected
+--- request
+POST /anything
+{"messages":[{"role":"user","content":"hi"}]}
+--- more_headers
+Content-Type: application/json
+--- error_code: 502
+--- error_log
+aborting AI response: Content-Length 100000 exceeds max_response_bytes 1024
+
+
+
+=== TEST 7: schema rejects non-positive max_stream_duration_ms
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/routes/1',
+                 ngx.HTTP_PUT,
+                 [[{
+                    "uri": "/anything",
+                    "plugins": {
+                        "ai-proxy": {
+                            "provider": "openai",
+                            "auth": {"header": {"Authorization": "Bearer x"}},
+                            "options": {"model": "gpt-3.5-turbo"},
+                            "max_stream_duration_ms": 0
+                        }
+                    }
+                }]]
+            )
+            ngx.status = code
+            ngx.say(body)
+        }
+    }
+--- error_code: 400
+--- response_body_like eval
+qr/max_stream_duration_ms/
+
+
+
+=== TEST 8: set route with max_response_bytes against chunked (no-CL) upstream
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/routes/1',
+                 ngx.HTTP_PUT,
+                 [[{
+                    "uri": "/anything",
+                    "plugins": {
+                        "ai-proxy": {
+                            "provider": "openai",
+                            "auth": {
+                                "header": {
+                                    "Authorization": "Bearer token"
+                                }
+                            },
+                            "options": {
+                                "model": "gpt-3.5-turbo"
+                            },
+                            "max_response_bytes": 1024,
+                            "override": {
+                                "endpoint": 
"http://localhost:7740/v1/oversized_chunked";
+                            },
+                            "ssl_verify": false
+                        }
+                    }
+                }]]
+            )
+            if code >= 300 then
+                ngx.status = code
+            end
+            ngx.say(body)
+        }
+    }
+--- response_body
+passed
+
+
+
+=== TEST 9: chunked non-streaming response exceeding max_response_bytes 
returns 502
+--- request
+POST /anything
+{"messages":[{"role":"user","content":"hi"}]}
+--- more_headers
+Content-Type: application/json
+--- error_code: 502
+--- error_log
+aborting AI response: body size exceeds max_response_bytes 1024


Reply via email to