EarthChen commented on PR #15957:
URL: https://github.com/apache/dubbo/pull/15957#issuecomment-3699782575

   ## 核心概念
   
   ### 为什么需要手动流控?
   
   HTTP/2 协议使用**流控窗口**来防止接收端被发送端压垮。默认情况下,Netty 会自动管理流控:
   - 收到数据后自动发送 `WINDOW_UPDATE` 帧
   - 这种模式无法实现背压(Backpressure)控制
   
   **手动流控**允许应用层控制何时发送 `WINDOW_UPDATE`:
   - 只有当数据被真正处理后才返回窗口
   - 实现真正的端到端背压控制
   
   ### 关键配置
   
   | 配置项 | 描述 | 默认值 |
   |--------|------|--------|
   | `windowUpdateRatio` | 消费字节达到初始窗口的多少比例时发送 WINDOW_UPDATE | 0.5 (50%) |
   | `initialWindowSize` | 初始流控窗口大小 | 8MB |
   | `AUTO_STREAM_FLOW_CONTROL` | Netty 配置,禁用 Http2StreamChannel 自动流控 | false |
   
   ## 架构图
   
   ```
   
┌─────────────────────────────────────────────────────────────────────────────┐
   │                           HTTP/2 连接层                                      │
   │  ┌─────────────────────────────────────────────────────────────────────┐   
│
   │  │                    Http2FrameCodec                                   │  
 │
   │  │  ┌─────────────────────────────────────────────────────────────┐   │   │
   │  │  │              TripleHttp2LocalFlowController                  │   │   
│
   │  │  │  - receiveFlowControlledFrame(): 减少窗口                    │   │   │
   │  │  │  - consumeBytes(): 返回窗口,触发 WINDOW_UPDATE              │   │   │
   │  │  │  - windowUpdateRatio: 控制何时发送 WINDOW_UPDATE             │   │   │
   │  │  └─────────────────────────────────────────────────────────────┘   │   │
   │  └─────────────────────────────────────────────────────────────────────┘   
│
   │                                    │                                       
 │
   │                       Http2MultiplexHandler                                
 │
   │                                    │                                       
 │
   │              ┌─────────────────────┴─────────────────────┐                 
 │
   │              │                                           │                 
 │
   │  ┌───────────┴────────────┐               ┌──────────────┴───────────┐     
│
   │  │   Http2StreamChannel   │               │   Http2StreamChannel     │     
│
   │  │   (Stream 1)           │               │   (Stream 3)             │     
│
   │  │   AUTO_FLOW = false    │               │   AUTO_FLOW = false      │     
│
   │  └────────────────────────┘               └──────────────────────────┘     
│
   
└─────────────────────────────────────────────────────────────────────────────┘
   ```
   
   ## 服务端流控机制
   
   ### 数据流转图
   
   ```mermaid
   sequenceDiagram
       participant Client as 客户端
       participant Netty as Netty Http2FrameCodec
       participant FC as TripleHttp2LocalFlowController
       participant Handler as NettyHttp2FrameHandler
       participant Decoder as StreamingDecoder
       participant Listener as FragmentListener
       participant Channel as H2StreamChannel
       participant App as 业务处理
   
       Client->>Netty: DATA 帧
       Netty->>FC: receiveFlowControlledFrame()
       Note over FC: 减少窗口大小<br/>跟踪 unconsumedBytes
       Netty->>Handler: onData(ByteBuf)
       Handler->>Decoder: decode(inputStream)
       
       loop 处理每个消息
           Decoder->>Decoder: processBody()
           Note over Decoder: 读取消息字节
           Decoder->>Listener: bytesRead(numBytes)
           Listener->>Channel: consumeBytes(numBytes)
           Channel->>FC: consumeBytes(stream, numBytes)
           Note over FC: 累加返回的字节<br/>检查是否达到阈值
           alt 达到 windowUpdateRatio
               FC->>Client: WINDOW_UPDATE 帧
           end
           Decoder->>Listener: onFragmentMessage(rawMessage)
           Listener->>App: 业务处理
       end
   ```
   
   ### 关键类与职责
   
   #### 1. TripleHttp2Protocol(初始化)
   
   ```java
   // 服务端创建连接时配置手动流控
   private Http2Connection createHttp2ServerConnection(TripleConfig 
tripleConfig) {
       Http2Connection connection = new DefaultHttp2Connection(true);
       float windowUpdateRatio = tripleConfig.getWindowUpdateRatioOrDefault();
       connection.local().flowController(
           new TripleHttp2LocalFlowController(connection, windowUpdateRatio));
       return connection;
   }
   
   // 禁用 Http2StreamChannel 自动流控
   protected void initChannel(Http2StreamChannel ch) {
       ch.config().setOption(Http2StreamChannelOption.AUTO_STREAM_FLOW_CONTROL, 
false);
       // ...
   }
   ```
   
   #### 2. TripleHttp2LocalFlowController(流控核心)
   
   ```java
   public class TripleHttp2LocalFlowController extends 
DefaultHttp2LocalFlowController {
       
       public TripleHttp2LocalFlowController(Http2Connection connection, float 
windowUpdateRatio) {
           // windowUpdateRatio: 当消费字节达到初始窗口的该比例时发送 WINDOW_UPDATE
           // true: 启用按帧跟踪
           super(connection, windowUpdateRatio, true);
       }
   }
   ```
   
   #### 3. GenericHttp2ServerTransportListener(服务端入口)
   
   ```java
   @Override
   protected HttpMessageListener buildHttpMessageListener() {
       // 设置 FragmentListener 来处理流控
       streamingDecoder.setFragmentListener(new 
DefaultFragmentListener(listeningDecoder));
       return new StreamingHttpMessageListener(streamingDecoder);
   }
   
   final class DefaultFragmentListener implements FragmentListener {
       @Override
       public void bytesRead(int numBytes) {
           // 数据被读取后,通知流控器返回窗口
           getH2StreamChannel().consumeBytes(numBytes);
       }
   }
   ```
   
   #### 4. LengthFieldStreamingDecoder / TriDecoder(解码触发)
   
   ```java
   private void processBody() {
       int totalBytesRead = lengthFieldOffset + lengthFieldLength + 
requiredLength;
       
       byte[] rawMessage;
       try {
           rawMessage = readRawMessage(accumulate, requiredLength);
       } finally {
           // 在 finally 块中通知 bytesRead,确保流控一定被触发
           listener.bytesRead(totalBytesRead);
       }
       
       // 处理消息
       invokeListener(inputStream);
   }
   ```
   
   #### 5. NettyH2StreamChannel(执行流控)
   
   ```java
   @Override
   public void consumeBytes(int numBytes) throws Exception {
       Http2LocalFlowController localFlowController = 
http2Connection.local().flowController();
       int streamId = http2StreamChannel.stream().id();
       Http2Stream stream = http2Connection.stream(streamId);
       
       // 必须在 EventLoop 线程执行
       if (http2StreamChannel.eventLoop().inEventLoop()) {
           localFlowController.consumeBytes(stream, numBytes);
       } else {
           http2StreamChannel.eventLoop().execute(() -> {
               localFlowController.consumeBytes(stream, numBytes);
           });
       }
   }
   ```
   
   ### 服务端数据流转路径
   
   ```
   收到 DATA 帧
       │
       ▼
   Http2FrameCodec.onDataRead() → 返回 0,不自动消费
       │
       ▼
   TripleHttp2LocalFlowController.receiveFlowControlledFrame()
       │  - 减少流窗口
       │  - 跟踪 unconsumedBytes
       ▼
   NettyHttp2FrameHandler.onData()
       │
       ▼
   StreamingDecoder.decode()
       │
       ▼
   LengthFieldStreamingDecoder.processBody()
       │
       ├──► 读取消息字节
       │
       ├──► finally { listener.bytesRead(numBytes) }  ◄─── 关键!在 finally 块中
       │         │
       │         ▼
       │    DefaultFragmentListener.bytesRead()
       │         │
       │         ▼
       │    H2StreamChannel.consumeBytes()
       │         │
       │         ▼
       │    TripleHttp2LocalFlowController.consumeBytes()
       │         │
       │         ▼
       │    [如果达到 windowUpdateRatio] → 发送 WINDOW_UPDATE
       │
       ▼
   listener.onFragmentMessage() → 业务处理
   ```
   
   ---
   
   ## 客户端流控机制
   
   ### 数据流转图
   
   ```mermaid
   sequenceDiagram
       participant Server as 服务端
       participant Netty as Netty Http2FrameCodec
       participant FC as TripleHttp2LocalFlowController
       participant Handler as TripleHttp2ClientResponseHandler
       participant Stream as AbstractTripleClientStream
       participant Decoder as TriDecoder
       participant Listener as TriDecoder.Listener
       participant App as 业务回调
   
       Server->>Netty: DATA 帧
       Netty->>FC: receiveFlowControlledFrame()
       Note over FC: 减少窗口大小
       Netty->>Handler: onData(ByteBuf)
       Handler->>Stream: H2TransportListener.onData()
       Stream->>Decoder: deframer.deframe(data)
       
       loop 处理每个消息
           Decoder->>Decoder: processBody()
           Note over Decoder: 读取消息字节
           Decoder->>Listener: bytesRead(numBytes)
           Listener->>Stream: consumeBytes(numBytes)
           Stream->>FC: Http2LocalFlowController.consumeBytes()
           Note over FC: 累加返回的字节
           alt 达到 windowUpdateRatio
               FC->>Server: WINDOW_UPDATE 帧
           end
           Decoder->>Listener: onRawMessage(data)
           Listener->>App: 业务处理
       end
   ```
   
   ### 关键类与职责
   
   #### 1. TripleHttp2Protocol(客户端初始化)
   
   ```java
   @Override
   public void configClientPipeline(URL url, ChannelOperator operator, 
ContextOperator contextOperator) {
       TripleConfig tripleConfig = 
ConfigManager.getProtocolOrDefault(url).getTripleOrDefault();
       // 创建带有手动流控的连接
       Http2Connection connection = createHttp2ClientConnection(tripleConfig);
       // ...
   }
   
   private Http2Connection createHttp2ClientConnection(TripleConfig 
tripleConfig) {
       Http2Connection connection = new DefaultHttp2Connection(false);
       float windowUpdateRatio = tripleConfig.getWindowUpdateRatioOrDefault();
       connection.local().flowController(
           new TripleHttp2LocalFlowController(connection, windowUpdateRatio));
       return connection;
   }
   ```
   
   #### 2. Http2TripleClientStream(客户端流)
   
   ```java
   @Override
   protected TripleStreamChannelFuture initStreamChannel(Channel parent) {
       Http2StreamChannelBootstrap bootstrap = new 
Http2StreamChannelBootstrap(parent);
       // 禁用自动流控
       bootstrap.option(Http2StreamChannelOption.AUTO_STREAM_FLOW_CONTROL, 
false);
       // ...
   }
   
   @Override
   protected void consumeBytes(int numBytes) {
       // 从 parent channel 获取 Http2Connection
       Http2Connection http2Connection = getHttp2Connection();
       Http2LocalFlowController localFlowController = 
http2Connection.local().flowController();
       
       // 调用流控器消费字节
       localFlowController.consumeBytes(stream, numBytes);
   }
   
   private Http2Connection getHttp2Connection() {
       ChannelHandlerContext ctx = 
parent.pipeline().context(Http2FrameCodec.class);
       Http2FrameCodec codec = (Http2FrameCodec) ctx.handler();
       return codec.connection();
   }
   ```
   
   #### 3. AbstractTripleClientStream.ClientTransportListener(设置解码监听器)
   
   ```java
   void onHeaderReceived(Http2Headers headers) {
       // 创建解码器监听器
       TriDecoder.Listener listener = new TriDecoder.Listener() {
           @Override
           public void bytesRead(int numBytes) {
               // 调用抽象方法,由子类实现
               consumeBytes(numBytes);
           }
   
           @Override
           public void onRawMessage(byte[] data) {
               AbstractTripleClientStream.this.listener.onMessage(data, 
isReturnTriException);
           }
       };
       deframer = new TriDecoder(decompressor, listener);
   }
   ```
   
   #### 4. TriDecoder(gRPC 解码器)
   
   ```java
   private void processBody() {
       int totalBytesRead = HEADER_LENGTH + requiredLength;
       
       byte[] stream;
       try {
           stream = compressedFlag ? getCompressedBody() : 
getUncompressedBody();
       } finally {
           // 在 finally 块中通知流控
           listener.bytesRead(totalBytesRead);
       }
       
       listener.onRawMessage(stream);
   }
   ```
   
   ### 客户端数据流转路径
   
   ```
   收到 DATA 帧(来自服务端响应)
       │
       ▼
   Http2FrameCodec.onDataRead() → 返回 0,不自动消费
       │
       ▼
   TripleHttp2LocalFlowController.receiveFlowControlledFrame()
       │  - 减少流窗口
       ▼
   TripleHttp2ClientResponseHandler.onData()
       │
       ▼
   ClientTransportListener.onData()
       │
       ▼
   TriDecoder.deframe(data)
       │
       ▼
   TriDecoder.processBody()
       │
       ├──► 读取/解压消息
       │
       ├──► finally { listener.bytesRead(numBytes) }  ◄─── 关键!
       │         │
       │         ▼
       │    TriDecoder.Listener.bytesRead()
       │         │
       │         ▼
       │    AbstractTripleClientStream.consumeBytes()
       │         │
       │         ▼
       │    Http2TripleClientStream.consumeBytes()
       │         │
       │         ▼
       │    TripleHttp2LocalFlowController.consumeBytes()
       │         │
       │         ▼
       │    [如果达到 windowUpdateRatio] → 发送 WINDOW_UPDATE
       │
       ▼
   listener.onRawMessage() → 业务回调
   ```
   
   ---
   
   ## 配置说明
   
   
   ### windowUpdateRatio 工作原理
   
   假设 `initialWindowSize = 8MB`,`windowUpdateRatio = 0.5`:
   
   1. 初始窗口 = 8MB
   2. 接收数据,窗口减少
   3. 调用 `consumeBytes()` 返回字节
   4. 当累计返回 4MB (8MB × 0.5) 时,发送 `WINDOW_UPDATE` 帧
   5. 远端收到后,可以继续发送数据
   
   ```
   窗口状态变化:
   
   初始: [████████████████████████████████] 8MB 可用
         
   收到 2MB: [████████████████████████░░░░░░░░] 6MB 可用
             consumeBytes(2MB) - 累计 2MB,未达阈值
             
   收到 2MB: [████████████████░░░░░░░░░░░░░░░░] 4MB 可用
             consumeBytes(2MB) - 累计 4MB,达到 50%!
             → 发送 WINDOW_UPDATE (4MB)
   
   窗口恢复: [████████████████████████████████] 8MB 可用
   ```
   
   ---
   
   ## 设计要点总结
   
   ### 1. 两层禁用自动流控
   
   | 层级 | 配置 | 作用 |
   |------|------|------|
   | Http2Connection | TripleHttp2LocalFlowController | 控制 WINDOW_UPDATE 发送时机 |
   | Http2StreamChannel | AUTO_STREAM_FLOW_CONTROL=false | 禁止 Netty 自动发送 
WINDOW_UPDATE |
   
   ### 2. bytesRead 在 finally 块中调用
   
   确保即使消息处理失败,流控也能正常工作:
   
   ```java
   try {
       rawMessage = readRawMessage();
   } finally {
       listener.bytesRead(totalBytesRead);  // 必须执行
   }
   ```
   
   ### 3. EventLoop 线程安全
   
   `consumeBytes()` 必须在 EventLoop 线程执行:
   
   ```java
   if (eventLoop().inEventLoop()) {
       localFlowController.consumeBytes(stream, numBytes);
   } else {
       eventLoop().execute(() -> localFlowController.consumeBytes(stream, 
numBytes));
   }
   ```
   
   
   ---
   
   ## 类关系图
   
   ```mermaid
   classDiagram
       class TripleConfig {
           +Float windowUpdateRatio
           +Integer initialWindowSize
           +getWindowUpdateRatioOrDefault() float
       }
       
       class TripleHttp2LocalFlowController {
           +TripleHttp2LocalFlowController(connection, windowUpdateRatio)
           +receiveFlowControlledFrame()
           +consumeBytes(stream, numBytes)
       }
       
       class DefaultHttp2LocalFlowController {
           +consumeBytes(stream, numBytes)
       }
       
       class H2StreamChannel {
           <<interface>>
           +consumeBytes(numBytes)
       }
       
       class NettyH2StreamChannel {
           -Http2Connection http2Connection
           +consumeBytes(numBytes)
       }
       
       class StreamingDecoder {
           <<interface>>
           +setFragmentListener(listener)
       }
       
       class FragmentListener {
           <<interface>>
           +bytesRead(numBytes)
           +onFragmentMessage(rawMessage)
       }
       
       class AbstractTripleClientStream {
           <<abstract>>
           #consumeBytes(numBytes)*
       }
       
       class Http2TripleClientStream {
           +consumeBytes(numBytes)
           -getHttp2Connection()
       }
       
       TripleHttp2LocalFlowController --|> DefaultHttp2LocalFlowController
       NettyH2StreamChannel ..|> H2StreamChannel
       Http2TripleClientStream --|> AbstractTripleClientStream
       
       NettyH2StreamChannel --> TripleHttp2LocalFlowController : uses
       Http2TripleClientStream --> TripleHttp2LocalFlowController : uses
   ```
   
   ---
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to