This is an automated email from the ASF dual-hosted git repository.

shreemaan-abhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apisix.git


The following commit(s) were added to refs/heads/master by this push:
     new 944512b47 feat(ai-proxy): support Bedrock ConverseStream streaming 
(#13307)
944512b47 is described below

commit 944512b47b16c97ccdcae53d297c04eebfad7792
Author: Shreemaan Abhishek <[email protected]>
AuthorDate: Sat May 9 17:54:11 2026 +0800

    feat(ai-proxy): support Bedrock ConverseStream streaming (#13307)
    
    * feat(ai-proxy): support Bedrock ConverseStream streaming
    
    Phase 2 adds streaming via Bedrock's /converse-stream endpoint with
    AWS EventStream binary framing.
    
    - new ai-transport.eventstream codec parses/encodes EventStream frames
      (12-byte prelude + headers + payload + trailing CRC, validated via
      ngx.crc32_long); same API surface as ai-transport.sse so providers
      pick a framing module by name
    - ai-providers.base.parse_streaming_response selects sse vs
      aws-eventstream framing via provider.streaming_framing
    - bedrock provider declares streaming_framing = "aws-eventstream" and
      routes ctx.var.request_type == "ai_stream" to /converse-stream
    - bedrock-converse protocol decodes EventStream events:
      contentBlockDelta -> texts; metadata -> usage_and_done; exception
      :message-type -> done with warning
    - streaming tests (single + multi) cover path routing, SigV4 still
      validating, raw EventStream forwarded, token usage extracted from
      metadata, and a non-streaming control on the same route. Fixture
      is a real recorded /converse-stream response.
    - docs (en/zh) for ai-proxy and ai-proxy-multi updated
    
    * refactor(ai-proxy): remove unused encode helper from eventstream codec
    
    The eventstream module's encode() function and write_u32_be/write_u16_be
    helpers had no callers (no tests, no plugins, no transport modules
    referenced them). Drop them to keep only what's exercised. Decode path
    is unchanged.
    
    * fix(ai-proxy): move FRAMINGS table below require() statements
    
    luacheck flagged sse/eventstream as undefined because the FRAMINGS map
    referenced them before the require() lines that define them. Move the
    table below the requires so the references resolve at parse time.
    
    * ci(eclint): exclude binary fixtures from whitespace rules
    
    The recorded /converse-stream EventStream fixture
    (t/fixtures/bedrock/bedrock-converse-streaming.bin) tripped eclint's
    trim_trailing_whitespace rule because the random binary content happened
    to include the byte sequence 0x20 0x0a. Add a [*.bin] block mirroring
    the existing [*.pb] precedent so binary fixtures are skipped by
    EditorConfig-aware tools.
    
    * fix(ai-proxy): localize tostring in base.lua
    
    The streaming framing error path uses tostring() to format the framing
    name when it's missing or unknown. EE's lj-releng linter requires every
    Lua global referenced inside a module to be aliased to a local at the
    top of the file. Add the alias for parity with other locals already
    declared (table, pairs, type, math, ipairs, setmetatable).
    
    * fix(ai-proxy): validate buffer bounds for fixed-width EventStream header 
types
    
    parse_headers() validated the bounds of TYPE_STRING and TYPE_BYTE_ARRAY
    values but not the fixed-width types (BYTE, SHORT, INTEGER, LONG,
    TIMESTAMP, UUID). On a truncated headers section, read_u16_be /
    read_u32_be silently return nil and that nil was being stored as the
    header value, masking the framing error. Add explicit
    pos + size - 1 <= stop checks before each fixed-width read so a
    malformed frame surfaces a clear error instead of a partially parsed
    event.
    
    * docs(ai-proxy): correct split_buf corrupt-prelude comment
    
    The comment claimed decode() would surface the error, but split_buf
    only invokes decode() on bytes it has already advanced past. When the
    corrupt prelude is at offset 0, split_buf returns ("", buf) and
    decode() is never called on those bytes. Update the comment to
    describe the actual behavior in both cases (corrupt frame at offset
    0 vs. after some valid frames).
    
    * fix(ai-proxy): don't log raw Bedrock exception payloads
    
    The streaming exception path logged event.payload verbatim. Bedrock
    exception payloads are upstream-controlled JSON and may include partial
    model output, prompt fragments, or other request content. Log only
    the typed error and the payload size to avoid leaking that data into
    error logs.
    
    * fix(ai-proxy): use framing-specific buffer cap in streaming loop
    
    The 1 MiB MAX_SSE_BUF_SIZE was applied to every framing. AWS EventStream
    allows frames up to 16 MiB; a valid >1 MiB frame split across reads
    would have its in-progress bytes silently discarded by the cap.
    
    Each framing module now exposes its own `max_remainder` (sse: 1 MiB,
    unchanged; eventstream: MAX_FRAME_SIZE = 16 MiB), and 
parse_streaming_response
    reads framing.max_remainder instead of the hardcoded constant. A 1 MiB
    fallback is kept for any framing that forgets to declare one.
    
    * fix(ai-proxy): validate prelude CRC inside split_buf
    
    split_buf advanced pos based on total_length alone. If a chunk
    contained [valid_frame, corrupt_frame, valid_frame], split_buf would
    consume all three into `complete`, decode() would succeed on frame 1
    then stop on frame 2's CRC mismatch, and frame 3 would be silently
    lost — its bytes had already been advanced past and weren't preserved
    in remainder.
    
    Validate the prelude CRC inside split_buf before advancing. On
    mismatch, leave the corrupt frame and everything after it in
    remainder so the caller (or max_remainder cap) can handle it. Message
    CRC is still checked by decode() — accepted trade-off vs. CRCing the
    full frame twice on every iteration.
    
    * fix(ai-proxy): localize tostring in eventstream.lua
    
    The decode() warn path uses tostring() to format an invalid total_length
    field. lj-releng requires every Lua global referenced inside a module
    to be aliased to a local at the top of the file.
    
    * fix(ai-proxy): dispatch streaming response when upstream is AWS 
EventStream
    
    The streaming-vs-non-streaming choice in ai-proxy/base.lua was made by
    testing whether the upstream Content-Type contains 'text/event-stream'.
    Bedrock ConverseStream returns 'application/vnd.amazon.eventstream',
    which doesn't match — so even though the gateway correctly routed the
    request to /converse-stream, the response was processed by
    parse_response (non-streaming), bypassing the framing-aware
    parse_streaming_response that decodes events for usage and response-text
    extraction.
    
    In passthrough mode (Bedrock has no converter) the bytes still reached
    the client because parse_response falls through to lua_response_filter
    on a JSON-decode failure, but llm_prompt_tokens / llm_completion_tokens /
    llm_response_text were never populated and the SSE-event parser was
    never invoked.
    
    Recognize 'application/vnd.amazon.eventstream' as a streaming response
    content-type alongside 'text/event-stream', and switch to plain-text
    matching so the dot in 'application/vnd.amazon.eventstream' isn't
    interpreted as a regex wildcard.
    
    * test(ai-proxy): assert streaming pipeline runs in Bedrock TEST 19
    
    The previous TEST 19 only checked that the raw EventStream bytes were
    forwarded to the client and that token-related strings appeared in the
    binary. Both of those would still pass even if parse_streaming_response
    never ran (parse_response falls through to plain byte forwarding on
    JSON-decode failure, and the binary contains 'inputTokens'/'outputTokens'
    literally), so the gateway's usage-extraction path wasn't actually
    exercised.
    
    Add an error_log assertion for 'got token usage from ai service' — that
    log line is only emitted from parse_streaming_response when the Bedrock
    metadata frame is parsed and merge_usage() runs. This catches a
    regression where the response handler bypasses parse_streaming_response
    (as the just-fixed Content-Type dispatch bug did).
    
    * refactor(ai-proxy): rename eventstream module to aws-eventstream
    
    The transport module implements AWS EventStream binary framing — a name
    specific to AWS, not a generic event stream concept. Renaming so reviewers
    and maintainers can tell at a glance that this is AWS-specific.
---
 .editorconfig                                     |   6 +
 apisix/plugins/ai-protocols/bedrock-converse.lua  |  89 ++++++-
 apisix/plugins/ai-providers/base.lua              |  40 ++-
 apisix/plugins/ai-providers/bedrock.lua           |  10 +-
 apisix/plugins/ai-proxy/base.lua                  |  12 +-
 apisix/plugins/ai-transport/aws-eventstream.lua   | 292 ++++++++++++++++++++++
 apisix/plugins/ai-transport/sse.lua               |   8 +-
 docs/en/latest/plugins/ai-proxy-multi.md          |  21 +-
 docs/en/latest/plugins/ai-proxy.md                |  32 ++-
 docs/zh/latest/plugins/ai-proxy-multi.md          |  21 +-
 docs/zh/latest/plugins/ai-proxy.md                |  32 ++-
 t/fixtures/bedrock/bedrock-converse-streaming.bin | Bin 0 -> 1086 bytes
 t/lib/server.lua                                  | 120 ++++++++-
 t/plugin/ai-proxy-bedrock-single.t                |  70 ++++++
 t/plugin/ai-proxy-bedrock.t                       | 118 +++++++++
 15 files changed, 810 insertions(+), 61 deletions(-)

diff --git a/.editorconfig b/.editorconfig
index 38c0dec25..925edb3fb 100644
--- a/.editorconfig
+++ b/.editorconfig
@@ -52,6 +52,12 @@ insert_final_newline = unset
 trim_trailing_whitespace = unset
 end_of_line = unset
 
+[*.bin]
+indent_style = unset
+insert_final_newline = unset
+trim_trailing_whitespace = unset
+end_of_line = unset
+
 [*.sse]
 trim_trailing_whitespace = unset
 insert_final_newline = unset
diff --git a/apisix/plugins/ai-protocols/bedrock-converse.lua 
b/apisix/plugins/ai-protocols/bedrock-converse.lua
index 0e0d2061a..5c7112c06 100644
--- a/apisix/plugins/ai-protocols/bedrock-converse.lua
+++ b/apisix/plugins/ai-protocols/bedrock-converse.lua
@@ -17,7 +17,9 @@
 
 --- Bedrock Converse protocol adapter (client-side).
 -- Handles detection and response parsing for the Amazon Bedrock
--- Converse API format. Non-streaming only in this phase.
+-- Converse API format. Streaming uses the /converse-stream endpoint with
+-- AWS EventStream binary framing; this module's parse_sse_event consumes
+-- the {headers, payload} event shape produced by ai-transport.aws-eventstream.
 
 local core = require("apisix.core")
 local string_sub = string.sub
@@ -38,22 +40,95 @@ end
 
 
 --- Check whether the request is a streaming request.
--- Streaming is not supported in this phase.
+-- The Bedrock Converse API itself has no `stream` body field — we use
+-- `body.stream = true` as the gateway-side opt-in to route the request to
+-- /converse-stream (which returns AWS EventStream binary frames).
 function _M.is_streaming(body)
-    return false
+    return type(body) == "table" and body.stream == true
 end
 
 
 --- Prepare the outgoing request body for the target provider.
--- TODO: support streaming. Bedrock uses a separate /converse-stream endpoint
--- with the AWS EventStream binary protocol, which we don't yet implement.
--- For now, strip the `stream` field so we only send non-streaming requests
--- to /converse.
+-- Strip our gateway-side `stream` flag; Bedrock rejects unknown body fields
+-- and decides streaming purely by URL (/converse vs /converse-stream).
 function _M.prepare_outgoing_request(body)
     body.stream = nil
 end
 
 
+--- Parse a streaming event from Bedrock's EventStream framing.
+-- Event shape comes from ai-transport.aws-eventstream: {headers, payload}.
+-- Bedrock standard headers: :event-type, :content-type, :message-type.
+-- :message-type is "event" for normal events and "exception" for typed
+-- streaming errors (throttlingException, modelStreamErrorException, etc.).
+function _M.parse_sse_event(event, ctx, state)
+    if type(event) ~= "table" or type(event.headers) ~= "table" then
+        return { type = "skip" }
+    end
+
+    local message_type = event.headers[":message-type"]
+    if message_type == "exception" or message_type == "error" then
+        local err_type = event.headers[":exception-type"]
+                         or event.headers[":error-code"]
+                         or "unknown"
+        -- Don't log event.payload: it's upstream-controlled JSON that may
+        -- contain partial completions, prompt fragments, or other request
+        -- content. Log just the typed error and the payload size.
+        core.log.warn("Bedrock streaming exception: type=", err_type,
+                      ", payload_size=", #(event.payload or ""))
+        return { type = "done" }
+    end
+
+    local event_type = event.headers[":event-type"]
+    if not event_type then
+        return { type = "skip" }
+    end
+
+    if event_type == "contentBlockDelta" then
+        local data, err = core.json.decode(event.payload, { null_as_nil = true 
})
+        if not data then
+            core.log.warn("failed to decode contentBlockDelta payload: ", err)
+            return { type = "skip" }
+        end
+        if type(data.delta) == "table" and type(data.delta.text) == "string" 
then
+            return { type = "delta", texts = { data.delta.text } }
+        end
+        -- toolUse partial-JSON deltas and other delta shapes don't contribute
+        -- to llm_response_text; let the raw bytes pass through unchanged.
+        return { type = "skip" }
+    end
+
+    if event_type == "messageStop" then
+        return { type = "done" }
+    end
+
+    if event_type == "metadata" then
+        local data, err = core.json.decode(event.payload, { null_as_nil = true 
})
+        if not data or type(data.usage) ~= "table" then
+            if err then
+                core.log.warn("failed to decode metadata payload: ", err)
+            end
+            return { type = "skip" }
+        end
+        local raw = data.usage
+        return {
+            type = "usage_and_done",
+            usage = {
+                prompt_tokens = raw.inputTokens or 0,
+                completion_tokens = raw.outputTokens or 0,
+                total_tokens = raw.totalTokens
+                    or (raw.inputTokens or 0) + (raw.outputTokens or 0),
+            },
+            raw_usage = raw,
+        }
+    end
+
+    -- messageStart / contentBlockStart / contentBlockStop carry no metrics
+    -- or visible text and aren't surfaced to the client beyond passthrough.
+    return { type = "skip" }
+end
+
+
 --- Extract token usage from a non-streaming Bedrock response.
 -- Bedrock format: res_body.usage.inputTokens / outputTokens / totalTokens
 function _M.extract_usage(res_body)
diff --git a/apisix/plugins/ai-providers/base.lua 
b/apisix/plugins/ai-providers/base.lua
index f16b72bbb..944be2632 100644
--- a/apisix/plugins/ai-providers/base.lua
+++ b/apisix/plugins/ai-providers/base.lua
@@ -25,13 +25,11 @@ local mt = {
     __index = _M
 }
 
--- Maximum SSE buffer size per request (1 MB).
-local MAX_SSE_BUF_SIZE = 1024 * 1024
-
 local core = require("apisix.core")
 local plugin = require("apisix.plugin")
 local url  = require("socket.url")
 local sse  = require("apisix.plugins.ai-transport.sse")
+local aws_eventstream = require("apisix.plugins.ai-transport.aws-eventstream")
 local transport_http = require("apisix.plugins.ai-transport.http")
 local transport_auth = require("apisix.plugins.ai-transport.auth")
 local log_sanitize = require("apisix.utils.log-sanitize")
@@ -48,6 +46,16 @@ local type  = type
 local math  = math
 local ipairs = ipairs
 local setmetatable = setmetatable
+local tostring = tostring
+
+-- Streaming framings selectable via provider.streaming_framing.
+-- Each module exposes split_buf(buf) -> (complete, remainder) and
+-- decode(buf) -> array of events. The event shape is framing-specific;
+-- the protocol's parse_sse_event must understand it.
+local FRAMINGS = {
+    sse = sse,
+    ["aws-eventstream"] = aws_eventstream,
+}
 
 
 function _M.new(opt)
@@ -367,13 +375,20 @@ function _M.parse_response(self, ctx, res, client_proto, 
converter, conf)
 end
 
 
---- Process streaming SSE response.
--- Uses target protocol for SSE parsing and converter (if present) for
--- transforming events to client format.
+--- Process streaming response.
+-- Uses target protocol for event parsing and converter (if present) for
+-- transforming events to client format. The wire framing (SSE vs AWS
+-- EventStream binary) is selected by provider.streaming_framing; the
+-- protocol module's parse_sse_event must understand the resulting event
+-- shape.
 -- @param target_proto table The protocol module for the provider's native 
protocol
 -- @param converter table|nil The converter module (if protocol conversion 
needed)
 -- @param conf table|nil Plugin configuration (used for stream duration and 
size limits)
 function _M.parse_streaming_response(self, ctx, res, target_proto, converter, 
conf)
+    local framing = FRAMINGS[self.streaming_framing or "sse"]
+    if not framing then
+        return 500, "unknown streaming framing: " .. 
tostring(self.streaming_framing)
+    end
     local body_reader = res.body_reader
     local contents = {}
     local sse_state = { is_first = true }
@@ -415,7 +430,7 @@ function _M.parse_streaming_response(self, ctx, res, 
target_proto, converter, co
         end
         if not chunk then
             if #sse_buf > 0 then
-                core.log.warn("dropping incomplete SSE frame at EOF, size: ",
+                core.log.warn("dropping incomplete stream frame at EOF, size: 
",
                               #sse_buf)
             end
 
@@ -423,7 +438,7 @@ function _M.parse_streaming_response(self, ctx, res, 
target_proto, converter, co
             if converter and not output_sent then
                 local msg = "streaming response completed without producing "
                             .. "any output; the upstream likely returned a "
-                            .. "different SSE format than the converter 
expects"
+                            .. "different stream format than the converter 
expects"
                 core.log.error(msg)
                 return 502, msg
             end
@@ -438,13 +453,14 @@ function _M.parse_streaming_response(self, ctx, res, 
target_proto, converter, co
         end
 
         sse_buf = sse_buf .. chunk
-        local complete, remainder = sse.split_buf(sse_buf)
-        if #remainder > MAX_SSE_BUF_SIZE then
-            core.log.warn("SSE remainder exceeded ", MAX_SSE_BUF_SIZE, " 
bytes, resetting")
+        local complete, remainder = framing.split_buf(sse_buf)
+        local max_remainder = framing.max_remainder or 1024 * 1024
+        if #remainder > max_remainder then
+            core.log.warn("stream remainder exceeded ", max_remainder, " 
bytes, resetting")
             remainder = ""
         end
         sse_buf = remainder
-        local events = complete ~= "" and sse.decode(complete) or {}
+        local events = complete ~= "" and framing.decode(complete) or {}
         ctx.llm_response_contents_in_chunk = {}
         local converted_chunks = {}
 
diff --git a/apisix/plugins/ai-providers/bedrock.lua 
b/apisix/plugins/ai-providers/bedrock.lua
index 8fb2f0a1e..e4b826ec6 100644
--- a/apisix/plugins/ai-providers/bedrock.lua
+++ b/apisix/plugins/ai-providers/bedrock.lua
@@ -21,6 +21,7 @@ local ngx_escape_uri = ngx.escape_uri
 
 local host_template = "bedrock-runtime.%s.amazonaws.com"
 local chat_path_template = "/model/%s/converse"
+local stream_path_template = "/model/%s/converse-stream"
 
 local function get_host(region)
     return str_fmt(host_template, region)
@@ -57,6 +58,9 @@ return require("apisix.plugins.ai-providers.base").new({
     get_node = get_node,
     remove_model = true,
     aws_sigv4 = true,
+    -- Bedrock ConverseStream uses AWS EventStream binary framing on the
+    -- /converse-stream endpoint, not Server-Sent Events.
+    streaming_framing = "aws-eventstream",
     capabilities = {
         ["bedrock-converse"] = {
             host = function(conf)
@@ -75,7 +79,11 @@ return require("apisix.plugins.ai-providers.base").new({
                 -- contain "/" (e.g. "...:application-inference-profile/abc")
                 -- and ":". auth-aws.lua's normalize_and_encode_path is
                 -- idempotent so this pre-encoding is preserved end-to-end.
-                return str_fmt(chat_path_template, ngx_escape_uri(model))
+                local template = chat_path_template
+                if ctx.var.request_type == "ai_stream" then
+                    template = stream_path_template
+                end
+                return str_fmt(template, ngx_escape_uri(model))
             end,
             rewrite_request_body = rewrite_converse_request_body,
         },
diff --git a/apisix/plugins/ai-proxy/base.lua b/apisix/plugins/ai-proxy/base.lua
index 2996d6413..745f3b7a5 100644
--- a/apisix/plugins/ai-proxy/base.lua
+++ b/apisix/plugins/ai-proxy/base.lua
@@ -286,8 +286,18 @@ function _M.before_proxy(conf, ctx, on_error)
             core.response.set_header("Content-Type", content_type)
 
             -- Step 5: Parse response
+            -- Streaming responses arrive with provider-specific framing
+            -- content-types: SSE for OpenAI/Anthropic/etc., AWS EventStream
+            -- binary frames for Bedrock ConverseStream. The framing module
+            -- is selected inside parse_streaming_response via
+            -- provider.streaming_framing.
             local code, body
-            if content_type and core.string.find(content_type, 
"text/event-stream") then
+            local is_streaming_resp = content_type and (
+                core.string.find(content_type, "text/event-stream", 1, true) or
+                core.string.find(content_type,
+                                 "application/vnd.amazon.eventstream", 1, true)
+            )
+            if is_streaming_resp then
                 local target_proto_module = protocols.get(target_proto)
                 if not target_proto_module then
                     core.log.error("no protocol module for streaming target: 
", target_proto)
diff --git a/apisix/plugins/ai-transport/aws-eventstream.lua 
b/apisix/plugins/ai-transport/aws-eventstream.lua
new file mode 100644
index 000000000..09f017843
--- /dev/null
+++ b/apisix/plugins/ai-transport/aws-eventstream.lua
@@ -0,0 +1,292 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+--- AWS EventStream binary framing codec.
+-- Used by Bedrock ConverseStream, Kinesis SubscribeToShard, S3 
SelectObjectContent,
+-- and Transcribe streaming. Each frame:
+--
+--   prelude (12 bytes)
+--     total_length    BE uint32  -- entire frame including trailing CRC
+--     headers_length  BE uint32  -- size of the headers section
+--     prelude_crc     BE uint32  -- CRC32 of the first 8 bytes
+--   headers (headers_length bytes)
+--     repeated entries: name_len(u8) name(bytes) value_type(u8) value...
+--   payload (total_length - 16 - headers_length bytes)
+--   message_crc (4 bytes BE uint32)  -- CRC32 of bytes [0, total_length-4)
+--
+-- This module provides the same API surface as ai-transport.sse so the base
+-- provider can pick a framing module by name.
+
+local core = require("apisix.core")
+local ngx = ngx
+local ngx_crc32 = ngx.crc32_long
+local string_byte = string.byte
+local string_sub = string.sub
+local tostring = tostring
+
+-- Hard cap on a single frame size to avoid memory blowups on malformed input.
+-- AWS documents ConverseStream frames as well under 1 MiB; pick 16 MiB to be
+-- safe for other services that use this codec.
+local MAX_FRAME_SIZE = 16 * 1024 * 1024
+
+-- Header value type tags (AWS EventStream spec). Bedrock only sends type 7
+-- (string), but we decode 6/7 for robustness against other AWS services.
+local TYPE_TRUE       = 0
+local TYPE_FALSE      = 1
+local TYPE_BYTE       = 2
+local TYPE_SHORT      = 3
+local TYPE_INTEGER    = 4
+local TYPE_LONG       = 5
+local TYPE_BYTE_ARRAY = 6
+local TYPE_STRING     = 7
+local TYPE_TIMESTAMP  = 8
+local TYPE_UUID       = 9
+
+local _M = {
+    -- Cap on bytes split_buf may leave in `remainder`. The streaming loop
+    -- in ai-providers.base uses this to bound the buffer when a frame has
+    -- not yet completed. A single in-progress frame can be up to
+    -- MAX_FRAME_SIZE bytes, so the remainder cap matches that.
+    max_remainder = MAX_FRAME_SIZE,
+}
+
+
+local function read_u32_be(s, pos)
+    local b1, b2, b3, b4 = string_byte(s, pos, pos + 3)
+    if not b4 then
+        return nil
+    end
+    return b1 * 16777216 + b2 * 65536 + b3 * 256 + b4
+end
+
+
+local function read_u16_be(s, pos)
+    local b1, b2 = string_byte(s, pos, pos + 1)
+    if not b2 then
+        return nil
+    end
+    return b1 * 256 + b2
+end
+
+
+-- Decode a single frame's headers section.
+-- @param s string  Full frame buffer
+-- @param start int  1-based start of headers
+-- @param stop int   1-based end of headers (inclusive)
+-- @return table|nil  Map of header name -> string value
+-- @return string|nil error
+local function parse_headers(s, start, stop)
+    local headers = {}
+    local pos = start
+    while pos <= stop do
+        local name_len = string_byte(s, pos)
+        if not name_len then
+            return nil, "truncated header entry"
+        end
+        pos = pos + 1
+        if pos + name_len - 1 > stop then
+            return nil, "header name extends past headers section"
+        end
+        local name = string_sub(s, pos, pos + name_len - 1)
+        pos = pos + name_len
+
+        local value_type = string_byte(s, pos)
+        if not value_type then
+            return nil, "missing header value type"
+        end
+        pos = pos + 1
+
+        if value_type == TYPE_STRING or value_type == TYPE_BYTE_ARRAY then
+            local val_len = read_u16_be(s, pos)
+            if not val_len then
+                return nil, "truncated header value length"
+            end
+            pos = pos + 2
+            if pos + val_len - 1 > stop then
+                return nil, "header value extends past headers section"
+            end
+            headers[name] = string_sub(s, pos, pos + val_len - 1)
+            pos = pos + val_len
+        elseif value_type == TYPE_TRUE then
+            headers[name] = true
+        elseif value_type == TYPE_FALSE then
+            headers[name] = false
+        elseif value_type == TYPE_BYTE then
+            if pos > stop then
+                return nil, "truncated header byte value"
+            end
+            headers[name] = string_byte(s, pos)
+            pos = pos + 1
+        elseif value_type == TYPE_SHORT then
+            if pos + 1 > stop then
+                return nil, "truncated header short value"
+            end
+            headers[name] = read_u16_be(s, pos)
+            pos = pos + 2
+        elseif value_type == TYPE_INTEGER then
+            if pos + 3 > stop then
+                return nil, "truncated header integer value"
+            end
+            headers[name] = read_u32_be(s, pos)
+            pos = pos + 4
+        elseif value_type == TYPE_LONG then
+            if pos + 7 > stop then
+                return nil, "truncated header long value"
+            end
+            -- 64-bit ints don't fit in a Lua double; keep raw bytes.
+            headers[name] = string_sub(s, pos, pos + 7)
+            pos = pos + 8
+        elseif value_type == TYPE_TIMESTAMP then
+            if pos + 7 > stop then
+                return nil, "truncated header timestamp value"
+            end
+            headers[name] = string_sub(s, pos, pos + 7)
+            pos = pos + 8
+        elseif value_type == TYPE_UUID then
+            if pos + 15 > stop then
+                return nil, "truncated header uuid value"
+            end
+            headers[name] = string_sub(s, pos, pos + 15)
+            pos = pos + 16
+        else
+            return nil, "unknown header value type: " .. value_type
+        end
+    end
+    return headers
+end
+
+
+--- Split a buffer at the last complete frame boundary.
+-- A "complete" frame here is one whose prelude length fields are sane,
+-- whose full byte range is present, AND whose prelude CRC validates.
+-- The CRC check matters because split_buf advances pos based on
+-- total_length alone — without it, a frame with a sane length but bad
+-- prelude CRC would be consumed into `complete`, decode() would stop on
+-- it, and any valid frames behind it in the same chunk would be lost
+-- (already past pos, not preserved in remainder). Validating here keeps
+-- corrupt frames in `remainder` so the caller can either resync or trip
+-- max_remainder. Message CRC is intentionally not checked here (decode()
+-- handles that); a frame with a good prelude but bad payload CRC is rare
+-- and any frames behind it in the same chunk would still be advanced
+-- past — accepted trade-off vs. the cost of computing the message CRC
+-- twice on every frame.
+-- @param buf string
+-- @return string complete   Concatenated complete frames (or "" if none).
+-- @return string remainder  Bytes after the last complete frame.
+function _M.split_buf(buf)
+    local len = #buf
+    local pos = 1
+    while pos + 11 <= len do
+        local total_length = read_u32_be(buf, pos)
+        if not total_length or total_length < 16 or total_length > 
MAX_FRAME_SIZE then
+            -- Corrupt total_length field. Stop and leave the bytes in the
+            -- remainder; decode() never sees them.
+            break
+        end
+        if pos + total_length - 1 > len then
+            -- Frame not yet fully in buffer — wait for more chunks.
+            break
+        end
+        local prelude_crc = read_u32_be(buf, pos + 8)
+        if ngx_crc32(string_sub(buf, pos, pos + 7)) ~= prelude_crc then
+            -- Prelude CRC mismatch. Don't advance: keep this corrupt
+            -- frame and everything after in `remainder`, so we don't
+            -- silently consume valid frames sitting behind it.
+            break
+        end
+        pos = pos + total_length
+    end
+    if pos == 1 then
+        return "", buf
+    end
+    return string_sub(buf, 1, pos - 1), string_sub(buf, pos)
+end
+
+
+--- Decode a buffer of complete frames into events.
+-- @param buf string  Buffer containing zero or more complete frames.
+-- @return table  Array of {headers = {string -> string}, payload = string}.
+function _M.decode(buf)
+    local events = {}
+    local len = #buf
+    local pos = 1
+    while pos <= len do
+        if pos + 11 > len then
+            core.log.warn("aws-eventstream: truncated prelude at offset ", pos 
- 1,
+                          " (buffer ", len, " bytes)")
+            return events
+        end
+        local total_length = read_u32_be(buf, pos)
+        local headers_length = read_u32_be(buf, pos + 4)
+        local prelude_crc = read_u32_be(buf, pos + 8)
+
+        if not total_length or total_length < 16 or total_length > 
MAX_FRAME_SIZE then
+            core.log.warn("aws-eventstream: invalid total_length ",
+                          tostring(total_length), " at offset ", pos - 1)
+            return events
+        end
+        if headers_length > total_length - 16 then
+            core.log.warn("aws-eventstream: headers_length ", headers_length,
+                          " exceeds frame body")
+            return events
+        end
+        if pos + total_length - 1 > len then
+            core.log.warn("aws-eventstream: incomplete frame at offset ", pos 
- 1)
+            return events
+        end
+
+        local computed_prelude_crc = ngx_crc32(string_sub(buf, pos, pos + 7))
+        if computed_prelude_crc ~= prelude_crc then
+            core.log.warn("aws-eventstream: prelude CRC mismatch at offset ", 
pos - 1,
+                          " expected ", prelude_crc, " got ", 
computed_prelude_crc)
+            return events
+        end
+
+        local headers_start = pos + 12
+        local payload_start = headers_start + headers_length
+        local payload_end = pos + total_length - 5  -- inclusive
+        local message_crc = read_u32_be(buf, payload_end + 1)
+
+        local computed_message_crc = ngx_crc32(string_sub(buf, pos, 
payload_end))
+        if computed_message_crc ~= message_crc then
+            core.log.warn("aws-eventstream: message CRC mismatch at offset ", 
pos - 1,
+                          " expected ", message_crc, " got ", 
computed_message_crc)
+            return events
+        end
+
+        local headers, herr
+        if headers_length > 0 then
+            headers, herr = parse_headers(buf, headers_start, payload_start - 
1)
+            if not headers then
+                core.log.warn("aws-eventstream: failed to parse headers: ", 
herr)
+                return events
+            end
+        else
+            headers = {}
+        end
+
+        events[#events + 1] = {
+            headers = headers,
+            payload = string_sub(buf, payload_start, payload_end),
+        }
+        pos = pos + total_length
+    end
+    return events
+end
+
+
+return _M
diff --git a/apisix/plugins/ai-transport/sse.lua 
b/apisix/plugins/ai-transport/sse.lua
index 249fcf9f8..ad8ab2712 100644
--- a/apisix/plugins/ai-transport/sse.lua
+++ b/apisix/plugins/ai-transport/sse.lua
@@ -24,7 +24,13 @@ local tonumber = tonumber
 local tostring = tostring
 local ipairs = ipairs
 
-local _M = {}
+local _M = {
+    -- Cap on bytes split_buf may leave in `remainder`. Used by the streaming
+    -- loop in ai-providers.base to bound the buffer when frames don't
+    -- complete. SSE frames are small (text events delimited by blank lines),
+    -- so 1 MiB is plenty.
+    max_remainder = 1024 * 1024,
+}
 
 
 --- Decode an SSE text chunk into a list of event tables.
diff --git a/docs/en/latest/plugins/ai-proxy-multi.md 
b/docs/en/latest/plugins/ai-proxy-multi.md
index ca0a9b239..771801ba0 100644
--- a/docs/en/latest/plugins/ai-proxy-multi.md
+++ b/docs/en/latest/plugins/ai-proxy-multi.md
@@ -54,13 +54,14 @@ In addition, the Plugin also supports logging LLM request 
information in the acc
 
 When an instance's `provider` is set to `bedrock`, the Plugin expects requests 
in the [Bedrock Converse 
API](https://docs.aws.amazon.com/bedrock/latest/APIReference/API_runtime_Converse.html)
 format. The request URI must end with `/converse` and the body must contain a 
`messages` array.
 
-| Name               | Type   | Required | Description                         
                                                                 |
-| ------------------ | ------ | -------- | 
----------------------------------------------------------------------------------------------------
 |
-| `messages`         | Array  | True     | An array of message objects.        
                                                                  |
-| `messages.role`    | String | True     | Role of the message (`user`, 
`assistant`).                                                            |
-| `messages.content` | Array  | True     | An array of content blocks. Each 
block contains a `text` field (e.g., `[{"text": "What is 1+1?"}]`). |
-| `system`           | Array  | False    | Optional system prompt blocks 
(e.g., `[{"text": "You are a helpful assistant."}]`).                   |
-| `inferenceConfig`  | Object | False    | Optional inference parameters such 
as `maxTokens`, `temperature`, `topP`, `stopSequences`, etc.       |
+| Name               | Type    | Required | Description                        
                                                                  |
+| ------------------ | ------- | -------- | 
----------------------------------------------------------------------------------------------------
 |
+| `messages`         | Array   | True     | An array of message objects.       
                                                                   |
+| `messages.role`    | String  | True     | Role of the message (`user`, 
`assistant`).                                                            |
+| `messages.content` | Array   | True     | An array of content blocks. Each 
block contains a `text` field (e.g., `[{"text": "What is 1+1?"}]`). |
+| `system`           | Array   | False    | Optional system prompt blocks 
(e.g., `[{"text": "You are a helpful assistant."}]`).                   |
+| `inferenceConfig`  | Object  | False    | Optional inference parameters such 
as `maxTokens`, `temperature`, `topP`, `stopSequences`, etc.       |
+| `stream`           | Boolean | False    | When `true`, the Plugin proxies 
the request to Bedrock's 
[`ConverseStream`](https://docs.aws.amazon.com/bedrock/latest/APIReference/API_runtime_ConverseStream.html)
 endpoint and forwards the response in [AWS 
EventStream](https://docs.aws.amazon.com/AmazonS3/latest/API/RESTSelectObjectAppendix.html)
 (`application/vnd.amazon.eventstream`) binary framing. The flag is consumed by 
the Plugin and is not forwarded to Bedrock. |
 
 ## Attributes
 
@@ -2028,10 +2029,12 @@ 
https://bedrock-runtime.us-east-1.amazonaws.com/model/arn%3Aaws%3Abedrock%3Aus-e
 
 If `auth.aws.session_token` is set, it is used for temporary credentials 
(e.g., obtained from AWS STS or an assumed role) and will be added to the 
SigV4-signed request automatically. Both `auth.aws.secret_access_key` and 
`auth.aws.session_token` are stored encrypted.
 
-Streaming responses (Bedrock `ConverseStream`) are not yet supported by the 
Plugin.
-
 :::
 
+#### Streaming with Bedrock `ConverseStream`
+
+To enable streaming, send the same Converse request body with `"stream": 
true`. The Plugin routes the request to Bedrock's 
`/model/<model>/converse-stream` endpoint and forwards each AWS EventStream 
frame to the client unchanged. The response `Content-Type` is 
`application/vnd.amazon.eventstream`; clients must parse the binary framing 
themselves (most AWS SDKs do this automatically).
+
 ### Proxy to Embedding Models
 
 The following example demonstrates how you can configure the `ai-proxy-multi` 
Plugin to proxy requests and load balance between embedding models.
diff --git a/docs/en/latest/plugins/ai-proxy.md 
b/docs/en/latest/plugins/ai-proxy.md
index b29a6f68f..7996cc25a 100644
--- a/docs/en/latest/plugins/ai-proxy.md
+++ b/docs/en/latest/plugins/ai-proxy.md
@@ -54,13 +54,14 @@ In addition, the Plugin also supports logging LLM request 
information in the acc
 
 When `provider` is set to `bedrock`, the Plugin expects requests in the 
[Bedrock Converse 
API](https://docs.aws.amazon.com/bedrock/latest/APIReference/API_runtime_Converse.html)
 format. The request URI must end with `/converse` and the body must contain a 
`messages` array.
 
-| Name               | Type   | Required | Description                         
                                                                 |
-| ------------------ | ------ | -------- | 
----------------------------------------------------------------------------------------------------
 |
-| `messages`         | Array  | True     | An array of message objects.        
                                                                  |
-| `messages.role`    | String | True     | Role of the message (`user`, 
`assistant`).                                                            |
-| `messages.content` | Array  | True     | An array of content blocks. Each 
block contains a `text` field (e.g., `[{"text": "What is 1+1?"}]`). |
-| `system`           | Array  | False    | Optional system prompt blocks 
(e.g., `[{"text": "You are a helpful assistant."}]`).                   |
-| `inferenceConfig`  | Object | False    | Optional inference parameters such 
as `maxTokens`, `temperature`, `topP`, etc.                        |
+| Name               | Type    | Required | Description                        
                                                                  |
+| ------------------ | ------- | -------- | 
----------------------------------------------------------------------------------------------------
 |
+| `messages`         | Array   | True     | An array of message objects.       
                                                                   |
+| `messages.role`    | String  | True     | Role of the message (`user`, 
`assistant`).                                                            |
+| `messages.content` | Array   | True     | An array of content blocks. Each 
block contains a `text` field (e.g., `[{"text": "What is 1+1?"}]`). |
+| `system`           | Array   | False    | Optional system prompt blocks 
(e.g., `[{"text": "You are a helpful assistant."}]`).                   |
+| `inferenceConfig`  | Object  | False    | Optional inference parameters such 
as `maxTokens`, `temperature`, `topP`, etc.                        |
+| `stream`           | Boolean | False    | When `true`, the Plugin proxies 
the request to Bedrock's 
[`ConverseStream`](https://docs.aws.amazon.com/bedrock/latest/APIReference/API_runtime_ConverseStream.html)
 endpoint and forwards the response in [AWS 
EventStream](https://docs.aws.amazon.com/AmazonS3/latest/API/RESTSelectObjectAppendix.html)
 (`application/vnd.amazon.eventstream`) binary framing. The flag is consumed by 
the Plugin and is not forwarded to Bedrock. |
 
 ## Attributes
 
@@ -884,10 +885,23 @@ 
https://bedrock-runtime.us-east-1.amazonaws.com/model/arn%3Aaws%3Abedrock%3Aus-e
 
 If `auth.aws.session_token` is set, it is used for temporary credentials 
(e.g., obtained from AWS STS or an assumed role) and will be added to the 
SigV4-signed request automatically. Both `auth.aws.secret_access_key` and 
`auth.aws.session_token` are stored encrypted.
 
-Streaming responses (Bedrock `ConverseStream`) are not yet supported by the 
Plugin.
-
 :::
 
+#### Streaming with Bedrock `ConverseStream`
+
+To enable streaming, send the same Converse request body with `"stream": 
true`. The Plugin routes the request to Bedrock's 
`/model/<model>/converse-stream` endpoint and forwards each AWS EventStream 
frame to the client unchanged. The response `Content-Type` is 
`application/vnd.amazon.eventstream`; clients must parse the binary framing 
themselves (most AWS SDKs do this automatically).
+
+```shell
+curl "http://127.0.0.1:9080/bedrock/converse"; -X POST \
+  -H "Content-Type: application/json" \
+  --data '{
+    "stream": true,
+    "messages": [
+      {"role": "user", "content": [{"text": "What is 1+1?"}]}
+    ]
+  }' --output -
+```
+
 ### Proxy to OpenAI Embedding Models
 
 The following example demonstrates how you can configure the `ai-proxy` Plugin 
to proxy requests to embedding models. This example will use the OpenAI 
embedding model endpoint.
diff --git a/docs/zh/latest/plugins/ai-proxy-multi.md 
b/docs/zh/latest/plugins/ai-proxy-multi.md
index 358b7da0c..8148def3c 100644
--- a/docs/zh/latest/plugins/ai-proxy-multi.md
+++ b/docs/zh/latest/plugins/ai-proxy-multi.md
@@ -54,13 +54,14 @@ import TabItem from '@theme/TabItem';
 
 当某个实例的 `provider` 设置为 `bedrock` 时,插件期望请求采用 [Bedrock Converse 
API](https://docs.aws.amazon.com/bedrock/latest/APIReference/API_runtime_Converse.html)
 格式。请求 URI 必须以 `/converse` 结尾,且请求体必须包含 `messages` 数组。
 
-| 名称               | 类型   | 必选项 | 描述                                           
                                               |
-| ------------------ | ------ | -------- | 
----------------------------------------------------------------------------------------------------
 |
-| `messages`         | Array  | 是     | 消息对象数组。                                
                                          |
-| `messages.role`    | String | 是     | 消息的角色(`user`、`assistant`)。             
                                               |
-| `messages.content` | Array  | 是     | 内容块数组。每个块包含一个 `text` 字段(例如 `[{"text": 
"What is 1+1?"}]`)。 |
-| `system`           | Array  | 否    | 可选的系统提示块(例如 `[{"text": "You are a 
helpful assistant."}]`)。                   |
-| `inferenceConfig`  | Object | 否    | 可选的推理参数,如 
`maxTokens`、`temperature`、`topP` 等。                        |
+| 名称               | 类型    | 必选项 | 描述                                          
                                                |
+| ------------------ | ------- | -------- | 
----------------------------------------------------------------------------------------------------
 |
+| `messages`         | Array   | 是     | 消息对象数组。                               
                                           |
+| `messages.role`    | String  | 是     | 消息的角色(`user`、`assistant`)。            
                                                |
+| `messages.content` | Array   | 是     | 内容块数组。每个块包含一个 `text` 字段(例如 `[{"text": 
"What is 1+1?"}]`)。 |
+| `system`           | Array   | 否    | 可选的系统提示块(例如 `[{"text": "You are a 
helpful assistant."}]`)。                   |
+| `inferenceConfig`  | Object  | 否    | 可选的推理参数,如 
`maxTokens`、`temperature`、`topP` 等。                        |
+| `stream`           | Boolean | 否    | 设置为 `true` 时,插件会将请求代理到 Bedrock 的 
[`ConverseStream`](https://docs.aws.amazon.com/bedrock/latest/APIReference/API_runtime_ConverseStream.html)
 接口,并以 [AWS 
EventStream](https://docs.aws.amazon.com/AmazonS3/latest/API/RESTSelectObjectAppendix.html)
 二进制帧(`application/vnd.amazon.eventstream`)转发响应。该字段由插件消费,不会转发给 Bedrock。 |
 
 ## 属性
 
@@ -1841,10 +1842,12 @@ 
https://bedrock-runtime.us-east-1.amazonaws.com/model/arn%3Aaws%3Abedrock%3Aus-e
 
 如果设置了 `auth.aws.session_token`,则它将用于临时凭证(例如从 AWS STS 或扮演角色获得的凭证),并将自动添加到 SigV4 
签名的请求中。`auth.aws.secret_access_key` 和 `auth.aws.session_token` 都以加密形式存储。
 
-插件目前尚不支持流式响应(Bedrock `ConverseStream`)。
-
 :::
 
+#### 使用 Bedrock `ConverseStream` 进行流式响应
+
+要启用流式响应,请使用相同的 Converse 请求体,并在其中加上 `"stream": true`。插件会将请求路由到 Bedrock 的 
`/model/<model>/converse-stream` 接口,并将 AWS EventStream 帧原样转发给客户端。响应的 
`Content-Type` 为 `application/vnd.amazon.eventstream`,客户端需自行解析二进制帧(多数 AWS SDK 
已自动处理)。
+
 ### 代理到嵌入模型
 
 以下示例演示了如何配置 `ai-proxy-multi` 插件以代理请求并在嵌入模型之间进行负载均衡。
diff --git a/docs/zh/latest/plugins/ai-proxy.md 
b/docs/zh/latest/plugins/ai-proxy.md
index 0babeb969..425961251 100644
--- a/docs/zh/latest/plugins/ai-proxy.md
+++ b/docs/zh/latest/plugins/ai-proxy.md
@@ -54,13 +54,14 @@ import TabItem from '@theme/TabItem';
 
 当 `provider` 设置为 `bedrock` 时,插件期望请求采用 [Bedrock Converse 
API](https://docs.aws.amazon.com/bedrock/latest/APIReference/API_runtime_Converse.html)
 格式。请求 URI 必须以 `/converse` 结尾,且请求体必须包含 `messages` 数组。
 
-| 名称               | 类型   | 必选项 | 描述                                           
                                               |
-| ------------------ | ------ | -------- | 
----------------------------------------------------------------------------------------------------
 |
-| `messages`         | Array  | 是     | 消息对象数组。                                
                                          |
-| `messages.role`    | String | 是     | 消息的角色(`user`、`assistant`)。             
                                               |
-| `messages.content` | Array  | 是     | 内容块数组。每个块包含一个 `text` 字段(例如 `[{"text": 
"What is 1+1?"}]`)。 |
-| `system`           | Array  | 否    | 可选的系统提示块(例如 `[{"text": "You are a 
helpful assistant."}]`)。                   |
-| `inferenceConfig`  | Object | 否    | 可选的推理参数,如 
`maxTokens`、`temperature`、`topP` 等。                        |
+| 名称               | 类型    | 必选项 | 描述                                          
                                                |
+| ------------------ | ------- | -------- | 
----------------------------------------------------------------------------------------------------
 |
+| `messages`         | Array   | 是     | 消息对象数组。                               
                                           |
+| `messages.role`    | String  | 是     | 消息的角色(`user`、`assistant`)。            
                                                |
+| `messages.content` | Array   | 是     | 内容块数组。每个块包含一个 `text` 字段(例如 `[{"text": 
"What is 1+1?"}]`)。 |
+| `system`           | Array   | 否    | 可选的系统提示块(例如 `[{"text": "You are a 
helpful assistant."}]`)。                   |
+| `inferenceConfig`  | Object  | 否    | 可选的推理参数,如 
`maxTokens`、`temperature`、`topP` 等。                        |
+| `stream`           | Boolean | 否    | 设置为 `true` 时,插件会将请求代理到 Bedrock 的 
[`ConverseStream`](https://docs.aws.amazon.com/bedrock/latest/APIReference/API_runtime_ConverseStream.html)
 接口,并以 [AWS 
EventStream](https://docs.aws.amazon.com/AmazonS3/latest/API/RESTSelectObjectAppendix.html)
 二进制帧(`application/vnd.amazon.eventstream`)转发响应。该字段由插件消费,不会转发给 Bedrock。 |
 
 ## 属性
 
@@ -884,10 +885,23 @@ 
https://bedrock-runtime.us-east-1.amazonaws.com/model/arn%3Aaws%3Abedrock%3Aus-e
 
 如果设置了 `auth.aws.session_token`,则它将用于临时凭证(例如从 AWS STS 或扮演角色获得的凭证),并将自动添加到 SigV4 
签名的请求中。`auth.aws.secret_access_key` 和 `auth.aws.session_token` 都以加密形式存储。
 
-插件目前尚不支持流式响应(Bedrock `ConverseStream`)。
-
 :::
 
+#### 使用 Bedrock `ConverseStream` 进行流式响应
+
+要启用流式响应,请使用相同的 Converse 请求体,并在其中加上 `"stream": true`。插件会将请求路由到 Bedrock 的 
`/model/<model>/converse-stream` 接口,并将 AWS EventStream 帧原样转发给客户端。响应的 
`Content-Type` 为 `application/vnd.amazon.eventstream`,客户端需自行解析二进制帧(多数 AWS SDK 
已自动处理)。
+
+```shell
+curl "http://127.0.0.1:9080/bedrock/converse"; -X POST \
+  -H "Content-Type: application/json" \
+  --data '{
+    "stream": true,
+    "messages": [
+      {"role": "user", "content": [{"text": "What is 1+1?"}]}
+    ]
+  }' --output -
+```
+
 ### 代理到 OpenAI 嵌入模型
 
 以下示例演示了如何配置 `ai-proxy` 插件以将请求代理到嵌入模型。此示例将使用 OpenAI 嵌入模型端点。
diff --git a/t/fixtures/bedrock/bedrock-converse-streaming.bin 
b/t/fixtures/bedrock/bedrock-converse-streaming.bin
new file mode 100644
index 000000000..db4e9fb5e
Binary files /dev/null and b/t/fixtures/bedrock/bedrock-converse-streaming.bin 
differ
diff --git a/t/lib/server.lua b/t/lib/server.lua
index aaac46fe6..acd6e7c2b 100644
--- a/t/lib/server.lua
+++ b/t/lib/server.lua
@@ -1046,6 +1046,114 @@ function _M.bedrock_converse()
 end
 
 
+-- Mock for Bedrock /converse-stream. Reuses the same SigV4 + body shape
+-- validation as bedrock_converse(), then serves the recorded EventStream
+-- binary fixture so streaming tests can assert end-to-end framing,
+-- response-text aggregation, and token-usage extraction.
+function _M.bedrock_converse_stream()
+    local json = require("cjson.safe")
+
+    ngx.log(ngx.WARN, "[test] received uri: ", ngx.var.request_uri)
+
+    if ngx.req.get_method() ~= "POST" then
+        ngx.status = 400
+        ngx.say("Unsupported request method: ", ngx.req.get_method())
+        return
+    end
+
+    local headers = ngx.req.get_headers()
+    local auth_header = headers["authorization"]
+    local amz_date = headers["x-amz-date"]
+    if not auth_header or not amz_date then
+        ngx.status = 403
+        ngx.say(json.encode({message = "Missing Authentication Token"}))
+        return
+    end
+
+    if not auth_header:match("^AWS4%-HMAC%-SHA256 ") then
+        ngx.status = 403
+        ngx.say(json.encode({
+            message = "Authorization header missing AWS4-HMAC-SHA256 algorithm 
prefix"
+        }))
+        return
+    end
+
+    if not auth_header:match(
+        
"Credential=AKIAIOSFODNN7EXAMPLE/%d%d%d%d%d%d%d%d/us%-east%-1/bedrock/aws4_request"
+    ) then
+        ngx.status = 403
+        ngx.say(json.encode({
+            message = "Authorization Credential scope does not match expected "
+                .. "AKIAIOSFODNN7EXAMPLE/<DATE>/us-east-1/bedrock/aws4_request"
+        }))
+        return
+    end
+
+    local hex64 = string.rep("%x", 64)
+    if not auth_header:match("Signature=" .. hex64) then
+        ngx.status = 403
+        ngx.say(json.encode({
+            message = "Authorization Signature is missing or not 64 hex chars"
+        }))
+        return
+    end
+
+    if not amz_date:match("^%d%d%d%d%d%d%d%dT%d%d%d%d%d%dZ$") then
+        ngx.status = 403
+        ngx.say(json.encode({
+            message = "X-Amz-Date header does not match YYYYMMDDTHHMMSSZ 
format"
+        }))
+        return
+    end
+
+    ngx.req.read_body()
+    local body, err = json.decode(ngx.req.get_body_data() or "")
+    if not body then
+        ngx.status = 400
+        ngx.say(json.encode({message = "Invalid JSON: " .. (err or "")}))
+        return
+    end
+
+    -- Bedrock decides streaming purely by URL; the gateway must strip its
+    -- internal `stream` flag from the body before forwarding.
+    if body.stream ~= nil then
+        ngx.status = 400
+        ngx.say(json.encode({
+            message = "stream field should not be in request body for 
/converse-stream"
+        }))
+        return
+    end
+
+    if body.model then
+        ngx.status = 400
+        ngx.say(json.encode({
+            message = "model field should not be in request body"
+        }))
+        return
+    end
+
+    if not body.messages or #body.messages < 1 then
+        ngx.status = 400
+        ngx.say(json.encode({message = "messages is required"}))
+        return
+    end
+
+    local fixture_loader = require("lib.fixture_loader")
+    local content, ferr = 
fixture_loader.load("bedrock/bedrock-converse-streaming.bin")
+    if not content then
+        ngx.status = 500
+        ngx.say(ferr)
+        return
+    end
+
+    ngx.header["Content-Type"] = "application/vnd.amazon.eventstream"
+    ngx.header["Cache-Control"] = "no-cache"
+    ngx.header["Transfer-Encoding"] = "chunked"
+    ngx.print(content)
+    ngx.flush(true)
+end
+
+
 -- Error endpoints for ai-request-rewrite tests.
 function _M.bad_request()
     ngx.status = 400
@@ -1088,9 +1196,15 @@ end
 function _M.go()
     local uri = ngx.var.uri
 
-    -- Bedrock Converse API: /model/<model>/converse where <model> can contain
-    -- ':' and percent-encoded sequences (URL-encoded ARNs), which the path-to-
-    -- function-name conversion below can't represent. Dispatch directly.
+    -- Bedrock Converse API: /model/<model>/converse(-stream) where <model>
+    -- can contain ':' and percent-encoded sequences (URL-encoded ARNs),
+    -- which the path-to-function-name conversion below can't represent.
+    -- Dispatch directly. Match streaming first since /converse is a suffix
+    -- of /converse-stream.
+    if uri:match("^/model/.+/converse%-stream$") then
+        inject_headers()
+        return _M.bedrock_converse_stream()
+    end
     if uri:match("^/model/.+/converse$") then
         inject_headers()
         return _M.bedrock_converse()
diff --git a/t/plugin/ai-proxy-bedrock-single.t 
b/t/plugin/ai-proxy-bedrock-single.t
index 349036908..fc7e7d3b5 100644
--- a/t/plugin/ai-proxy-bedrock-single.t
+++ b/t/plugin/ai-proxy-bedrock-single.t
@@ -427,3 +427,73 @@ POST /single-ai/body-model-only/converse
 --- error_code: 400
 --- response_body eval
 qr/could not resolve upstream path/
+
+
+
+=== TEST 16: route for streaming (single ai-proxy, no path on endpoint)
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/routes/9',
+                 ngx.HTTP_PUT,
+                 [[{
+                    "uri": "/single-ai/stream/converse",
+                    "plugins": {
+                        "ai-proxy": {
+                            "provider": "bedrock",
+                            "auth": {
+                                "aws": {
+                                    "access_key_id": "AKIAIOSFODNN7EXAMPLE",
+                                    "secret_access_key": 
"wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"
+                                }
+                            },
+                            "provider_conf": {
+                                "region": "us-east-1"
+                            },
+                            "options": {
+                                "model": 
"anthropic.claude-3-5-sonnet-20241022-v2:0"
+                            },
+                            "override": {
+                                "endpoint": "http://127.0.0.1:1980";
+                            },
+                            "ssl_verify": false
+                        }
+                    }
+                }]]
+            )
+
+            if code >= 300 then
+                ngx.status = code
+            end
+            ngx.say(body)
+        }
+    }
+--- response_body
+passed
+
+
+
+=== TEST 17: stream request hits /converse-stream and forwards EventStream 
bytes
+--- request
+POST /single-ai/stream/converse
+{"stream":true,"messages":[{"role":"user","content":[{"text":"Say hi"}]}]}
+--- error_code: 200
+--- response_body eval
+qr/messageStart.*contentBlockDelta.*Hello.*messageStop.*metadata/s
+--- error_log eval
+qr{\[test\] received uri: 
/model/anthropic\.claude-3-5-sonnet-20241022-v2%3A0/converse-stream}
+--- response_headers
+Content-Type: application/vnd.amazon.eventstream
+
+
+
+=== TEST 18: non-stream request on the same route still hits /converse
+--- request
+POST /single-ai/stream/converse
+{"messages":[{"role":"user","content":[{"text":"What is 1+1?"}]}]}
+--- error_code: 200
+--- response_body eval
+qr/"text"\s*:\s*"Hello!"/
+--- error_log eval
+qr{\[test\] received uri: 
/model/anthropic\.claude-3-5-sonnet-20241022-v2%3A0/converse(?!-stream)}
diff --git a/t/plugin/ai-proxy-bedrock.t b/t/plugin/ai-proxy-bedrock.t
index 06be07905..00afa3e3a 100644
--- a/t/plugin/ai-proxy-bedrock.t
+++ b/t/plugin/ai-proxy-bedrock.t
@@ -516,3 +516,121 @@ POST /ai/body-model-only/converse
 --- error_code: 400
 --- response_body eval
 qr/could not resolve upstream path/
+
+
+
+=== TEST 17: route for streaming (no path on endpoint, model from options)
+--- config
+    location /t {
+        content_by_lua_block {
+            local t = require("lib.test_admin").test
+            local code, body = t('/apisix/admin/routes/9',
+                 ngx.HTTP_PUT,
+                 [[{
+                    "uri": "/ai/stream/converse",
+                    "plugins": {
+                        "ai-proxy-multi": {
+                            "instances": [
+                                {
+                                    "name": "bedrock-stream",
+                                    "provider": "bedrock",
+                                    "weight": 1,
+                                    "auth": {
+                                        "aws": {
+                                            "access_key_id": 
"AKIAIOSFODNN7EXAMPLE",
+                                            "secret_access_key": 
"wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"
+                                        }
+                                    },
+                                    "provider_conf": {
+                                        "region": "us-east-1"
+                                    },
+                                    "options": {
+                                        "model": 
"anthropic.claude-3-5-sonnet-20241022-v2:0"
+                                    },
+                                    "override": {
+                                        "endpoint": "http://127.0.0.1:1980";
+                                    }
+                                }
+                            ],
+                            "ssl_verify": false
+                        }
+                    }
+                }]]
+            )
+
+            if code >= 300 then
+                ngx.status = code
+            end
+            ngx.say(body)
+        }
+    }
+--- response_body
+passed
+
+
+
+=== TEST 18: stream request hits /converse-stream and forwards EventStream 
bytes
+--- request
+POST /ai/stream/converse
+{"stream":true,"messages":[{"role":"user","content":[{"text":"Say hi"}]}]}
+--- error_code: 200
+--- response_body eval
+qr/messageStart.*contentBlockDelta.*Hello.*messageStop.*metadata/s
+--- error_log eval
+qr{\[test\] received uri: 
/model/anthropic\.claude-3-5-sonnet-20241022-v2%3A0/converse-stream}
+--- response_headers
+Content-Type: application/vnd.amazon.eventstream
+
+
+
+=== TEST 19: stream request aggregates response text and token usage
+--- config
+    location /t {
+        content_by_lua_block {
+            local http = require("resty.http")
+            local httpc = http.new()
+            local res, err = httpc:request_uri("http://127.0.0.1:"; .. 
ngx.var.server_port
+                .. "/ai/stream/converse", {
+                method = "POST",
+                headers = {["Content-Type"] = "application/json"},
+                body = 
[[{"stream":true,"messages":[{"role":"user","content":[{"text":"hi"}]}]}]],
+            })
+            if not res then
+                ngx.status = 500
+                ngx.say(err)
+                return
+            end
+            ngx.status = res.status
+            -- Body is binary EventStream; expose payload-bearing keywords so 
the
+            -- test can assert frame ordering and token-bearing metadata 
payload.
+            local body = res.body
+            local found = {}
+            for _, name in ipairs({"messageStart", "contentBlockDelta",
+                                   "Hello", "messageStop", "metadata",
+                                   "inputTokens", "outputTokens"}) do
+                if body:find(name, 1, true) then
+                    found[#found + 1] = name
+                end
+            end
+            ngx.say(table.concat(found, ","))
+        }
+    }
+--- request
+GET /t
+--- error_code: 200
+--- response_body
+messageStart,contentBlockDelta,Hello,messageStop,metadata,inputTokens,outputTokens
+--- error_log eval
+qr/got token usage from ai service/
+
+
+
+=== TEST 20: non-stream request still hits /converse (control)
+--- request
+POST /ai/stream/converse
+{"messages":[{"role":"user","content":[{"text":"What is 1+1?"}]}]}
+--- error_code: 200
+--- response_body eval
+qr/"text"\s*:\s*"Hello!"/
+--- error_log eval
+qr{\[test\] received uri: 
/model/anthropic\.claude-3-5-sonnet-20241022-v2%3A0/converse(?!-stream)}

Reply via email to