EarthChen commented on PR #15957:
URL: https://github.com/apache/dubbo/pull/15957#issuecomment-3699782575
## 核心概念
### 为什么需要手动流控?
HTTP/2 协议使用**流控窗口**来防止接收端被发送端压垮。默认情况下,Netty 会自动管理流控:
- 收到数据后自动发送 `WINDOW_UPDATE` 帧
- 这种模式无法实现背压(Backpressure)控制
**手动流控**允许应用层控制何时发送 `WINDOW_UPDATE`:
- 只有当数据被真正处理后才返回窗口
- 实现真正的端到端背压控制
### 关键配置
| 配置项 | 描述 | 默认值 |
|--------|------|--------|
| `windowUpdateRatio` | 消费字节达到初始窗口的多少比例时发送 WINDOW_UPDATE | 0.5 (50%) |
| `initialWindowSize` | 初始流控窗口大小 | 8MB |
| `AUTO_STREAM_FLOW_CONTROL` | Netty 配置,禁用 Http2StreamChannel 自动流控 | false |
## 架构图
```
┌─────────────────────────────────────────────────────────────────────────────┐
│ HTTP/2 连接层 │
│ ┌─────────────────────────────────────────────────────────────────────┐
│
│ │ Http2FrameCodec │
│
│ │ ┌─────────────────────────────────────────────────────────────┐ │ │
│ │ │ TripleHttp2LocalFlowController │ │
│
│ │ │ - receiveFlowControlledFrame(): 减少窗口 │ │ │
│ │ │ - consumeBytes(): 返回窗口,触发 WINDOW_UPDATE │ │ │
│ │ │ - windowUpdateRatio: 控制何时发送 WINDOW_UPDATE │ │ │
│ │ └─────────────────────────────────────────────────────────────┘ │ │
│ └─────────────────────────────────────────────────────────────────────┘
│
│ │
│
│ Http2MultiplexHandler
│
│ │
│
│ ┌─────────────────────┴─────────────────────┐
│
│ │ │
│
│ ┌───────────┴────────────┐ ┌──────────────┴───────────┐
│
│ │ Http2StreamChannel │ │ Http2StreamChannel │
│
│ │ (Stream 1) │ │ (Stream 3) │
│
│ │ AUTO_FLOW = false │ │ AUTO_FLOW = false │
│
│ └────────────────────────┘ └──────────────────────────┘
│
└─────────────────────────────────────────────────────────────────────────────┘
```
## 服务端流控机制
### 数据流转图
```mermaid
sequenceDiagram
participant Client as 客户端
participant Netty as Netty Http2FrameCodec
participant FC as TripleHttp2LocalFlowController
participant Handler as NettyHttp2FrameHandler
participant Decoder as StreamingDecoder
participant Listener as FragmentListener
participant Channel as H2StreamChannel
participant App as 业务处理
Client->>Netty: DATA 帧
Netty->>FC: receiveFlowControlledFrame()
Note over FC: 减少窗口大小<br/>跟踪 unconsumedBytes
Netty->>Handler: onData(ByteBuf)
Handler->>Decoder: decode(inputStream)
loop 处理每个消息
Decoder->>Decoder: processBody()
Note over Decoder: 读取消息字节
Decoder->>Listener: bytesRead(numBytes)
Listener->>Channel: consumeBytes(numBytes)
Channel->>FC: consumeBytes(stream, numBytes)
Note over FC: 累加返回的字节<br/>检查是否达到阈值
alt 达到 windowUpdateRatio
FC->>Client: WINDOW_UPDATE 帧
end
Decoder->>Listener: onFragmentMessage(rawMessage)
Listener->>App: 业务处理
end
```
### 关键类与职责
#### 1. TripleHttp2Protocol(初始化)
```java
// 服务端创建连接时配置手动流控
private Http2Connection createHttp2ServerConnection(TripleConfig
tripleConfig) {
Http2Connection connection = new DefaultHttp2Connection(true);
float windowUpdateRatio = tripleConfig.getWindowUpdateRatioOrDefault();
connection.local().flowController(
new TripleHttp2LocalFlowController(connection, windowUpdateRatio));
return connection;
}
// 禁用 Http2StreamChannel 自动流控
protected void initChannel(Http2StreamChannel ch) {
ch.config().setOption(Http2StreamChannelOption.AUTO_STREAM_FLOW_CONTROL,
false);
// ...
}
```
#### 2. TripleHttp2LocalFlowController(流控核心)
```java
public class TripleHttp2LocalFlowController extends
DefaultHttp2LocalFlowController {
public TripleHttp2LocalFlowController(Http2Connection connection, float
windowUpdateRatio) {
// windowUpdateRatio: 当消费字节达到初始窗口的该比例时发送 WINDOW_UPDATE
// true: 启用按帧跟踪
super(connection, windowUpdateRatio, true);
}
}
```
#### 3. GenericHttp2ServerTransportListener(服务端入口)
```java
@Override
protected HttpMessageListener buildHttpMessageListener() {
// 设置 FragmentListener 来处理流控
streamingDecoder.setFragmentListener(new
DefaultFragmentListener(listeningDecoder));
return new StreamingHttpMessageListener(streamingDecoder);
}
final class DefaultFragmentListener implements FragmentListener {
@Override
public void bytesRead(int numBytes) {
// 数据被读取后,通知流控器返回窗口
getH2StreamChannel().consumeBytes(numBytes);
}
}
```
#### 4. LengthFieldStreamingDecoder / TriDecoder(解码触发)
```java
private void processBody() {
int totalBytesRead = lengthFieldOffset + lengthFieldLength +
requiredLength;
byte[] rawMessage;
try {
rawMessage = readRawMessage(accumulate, requiredLength);
} finally {
// 在 finally 块中通知 bytesRead,确保流控一定被触发
listener.bytesRead(totalBytesRead);
}
// 处理消息
invokeListener(inputStream);
}
```
#### 5. NettyH2StreamChannel(执行流控)
```java
@Override
public void consumeBytes(int numBytes) throws Exception {
Http2LocalFlowController localFlowController =
http2Connection.local().flowController();
int streamId = http2StreamChannel.stream().id();
Http2Stream stream = http2Connection.stream(streamId);
// 必须在 EventLoop 线程执行
if (http2StreamChannel.eventLoop().inEventLoop()) {
localFlowController.consumeBytes(stream, numBytes);
} else {
http2StreamChannel.eventLoop().execute(() -> {
localFlowController.consumeBytes(stream, numBytes);
});
}
}
```
### 服务端数据流转路径
```
收到 DATA 帧
│
▼
Http2FrameCodec.onDataRead() → 返回 0,不自动消费
│
▼
TripleHttp2LocalFlowController.receiveFlowControlledFrame()
│ - 减少流窗口
│ - 跟踪 unconsumedBytes
▼
NettyHttp2FrameHandler.onData()
│
▼
StreamingDecoder.decode()
│
▼
LengthFieldStreamingDecoder.processBody()
│
├──► 读取消息字节
│
├──► finally { listener.bytesRead(numBytes) } ◄─── 关键!在 finally 块中
│ │
│ ▼
│ DefaultFragmentListener.bytesRead()
│ │
│ ▼
│ H2StreamChannel.consumeBytes()
│ │
│ ▼
│ TripleHttp2LocalFlowController.consumeBytes()
│ │
│ ▼
│ [如果达到 windowUpdateRatio] → 发送 WINDOW_UPDATE
│
▼
listener.onFragmentMessage() → 业务处理
```
---
## 客户端流控机制
### 数据流转图
```mermaid
sequenceDiagram
participant Server as 服务端
participant Netty as Netty Http2FrameCodec
participant FC as TripleHttp2LocalFlowController
participant Handler as TripleHttp2ClientResponseHandler
participant Stream as AbstractTripleClientStream
participant Decoder as TriDecoder
participant Listener as TriDecoder.Listener
participant App as 业务回调
Server->>Netty: DATA 帧
Netty->>FC: receiveFlowControlledFrame()
Note over FC: 减少窗口大小
Netty->>Handler: onData(ByteBuf)
Handler->>Stream: H2TransportListener.onData()
Stream->>Decoder: deframer.deframe(data)
loop 处理每个消息
Decoder->>Decoder: processBody()
Note over Decoder: 读取消息字节
Decoder->>Listener: bytesRead(numBytes)
Listener->>Stream: consumeBytes(numBytes)
Stream->>FC: Http2LocalFlowController.consumeBytes()
Note over FC: 累加返回的字节
alt 达到 windowUpdateRatio
FC->>Server: WINDOW_UPDATE 帧
end
Decoder->>Listener: onRawMessage(data)
Listener->>App: 业务处理
end
```
### 关键类与职责
#### 1. TripleHttp2Protocol(客户端初始化)
```java
@Override
public void configClientPipeline(URL url, ChannelOperator operator,
ContextOperator contextOperator) {
TripleConfig tripleConfig =
ConfigManager.getProtocolOrDefault(url).getTripleOrDefault();
// 创建带有手动流控的连接
Http2Connection connection = createHttp2ClientConnection(tripleConfig);
// ...
}
private Http2Connection createHttp2ClientConnection(TripleConfig
tripleConfig) {
Http2Connection connection = new DefaultHttp2Connection(false);
float windowUpdateRatio = tripleConfig.getWindowUpdateRatioOrDefault();
connection.local().flowController(
new TripleHttp2LocalFlowController(connection, windowUpdateRatio));
return connection;
}
```
#### 2. Http2TripleClientStream(客户端流)
```java
@Override
protected TripleStreamChannelFuture initStreamChannel(Channel parent) {
Http2StreamChannelBootstrap bootstrap = new
Http2StreamChannelBootstrap(parent);
// 禁用自动流控
bootstrap.option(Http2StreamChannelOption.AUTO_STREAM_FLOW_CONTROL,
false);
// ...
}
@Override
protected void consumeBytes(int numBytes) {
// 从 parent channel 获取 Http2Connection
Http2Connection http2Connection = getHttp2Connection();
Http2LocalFlowController localFlowController =
http2Connection.local().flowController();
// 调用流控器消费字节
localFlowController.consumeBytes(stream, numBytes);
}
private Http2Connection getHttp2Connection() {
ChannelHandlerContext ctx =
parent.pipeline().context(Http2FrameCodec.class);
Http2FrameCodec codec = (Http2FrameCodec) ctx.handler();
return codec.connection();
}
```
#### 3. AbstractTripleClientStream.ClientTransportListener(设置解码监听器)
```java
void onHeaderReceived(Http2Headers headers) {
// 创建解码器监听器
TriDecoder.Listener listener = new TriDecoder.Listener() {
@Override
public void bytesRead(int numBytes) {
// 调用抽象方法,由子类实现
consumeBytes(numBytes);
}
@Override
public void onRawMessage(byte[] data) {
AbstractTripleClientStream.this.listener.onMessage(data,
isReturnTriException);
}
};
deframer = new TriDecoder(decompressor, listener);
}
```
#### 4. TriDecoder(gRPC 解码器)
```java
private void processBody() {
int totalBytesRead = HEADER_LENGTH + requiredLength;
byte[] stream;
try {
stream = compressedFlag ? getCompressedBody() :
getUncompressedBody();
} finally {
// 在 finally 块中通知流控
listener.bytesRead(totalBytesRead);
}
listener.onRawMessage(stream);
}
```
### 客户端数据流转路径
```
收到 DATA 帧(来自服务端响应)
│
▼
Http2FrameCodec.onDataRead() → 返回 0,不自动消费
│
▼
TripleHttp2LocalFlowController.receiveFlowControlledFrame()
│ - 减少流窗口
▼
TripleHttp2ClientResponseHandler.onData()
│
▼
ClientTransportListener.onData()
│
▼
TriDecoder.deframe(data)
│
▼
TriDecoder.processBody()
│
├──► 读取/解压消息
│
├──► finally { listener.bytesRead(numBytes) } ◄─── 关键!
│ │
│ ▼
│ TriDecoder.Listener.bytesRead()
│ │
│ ▼
│ AbstractTripleClientStream.consumeBytes()
│ │
│ ▼
│ Http2TripleClientStream.consumeBytes()
│ │
│ ▼
│ TripleHttp2LocalFlowController.consumeBytes()
│ │
│ ▼
│ [如果达到 windowUpdateRatio] → 发送 WINDOW_UPDATE
│
▼
listener.onRawMessage() → 业务回调
```
---
## 配置说明
### windowUpdateRatio 工作原理
假设 `initialWindowSize = 8MB`,`windowUpdateRatio = 0.5`:
1. 初始窗口 = 8MB
2. 接收数据,窗口减少
3. 调用 `consumeBytes()` 返回字节
4. 当累计返回 4MB (8MB × 0.5) 时,发送 `WINDOW_UPDATE` 帧
5. 远端收到后,可以继续发送数据
```
窗口状态变化:
初始: [████████████████████████████████] 8MB 可用
收到 2MB: [████████████████████████░░░░░░░░] 6MB 可用
consumeBytes(2MB) - 累计 2MB,未达阈值
收到 2MB: [████████████████░░░░░░░░░░░░░░░░] 4MB 可用
consumeBytes(2MB) - 累计 4MB,达到 50%!
→ 发送 WINDOW_UPDATE (4MB)
窗口恢复: [████████████████████████████████] 8MB 可用
```
---
## 设计要点总结
### 1. 两层禁用自动流控
| 层级 | 配置 | 作用 |
|------|------|------|
| Http2Connection | TripleHttp2LocalFlowController | 控制 WINDOW_UPDATE 发送时机 |
| Http2StreamChannel | AUTO_STREAM_FLOW_CONTROL=false | 禁止 Netty 自动发送
WINDOW_UPDATE |
### 2. bytesRead 在 finally 块中调用
确保即使消息处理失败,流控也能正常工作:
```java
try {
rawMessage = readRawMessage();
} finally {
listener.bytesRead(totalBytesRead); // 必须执行
}
```
### 3. EventLoop 线程安全
`consumeBytes()` 必须在 EventLoop 线程执行:
```java
if (eventLoop().inEventLoop()) {
localFlowController.consumeBytes(stream, numBytes);
} else {
eventLoop().execute(() -> localFlowController.consumeBytes(stream,
numBytes));
}
```
---
## 类关系图
```mermaid
classDiagram
class TripleConfig {
+Float windowUpdateRatio
+Integer initialWindowSize
+getWindowUpdateRatioOrDefault() float
}
class TripleHttp2LocalFlowController {
+TripleHttp2LocalFlowController(connection, windowUpdateRatio)
+receiveFlowControlledFrame()
+consumeBytes(stream, numBytes)
}
class DefaultHttp2LocalFlowController {
+consumeBytes(stream, numBytes)
}
class H2StreamChannel {
<<interface>>
+consumeBytes(numBytes)
}
class NettyH2StreamChannel {
-Http2Connection http2Connection
+consumeBytes(numBytes)
}
class StreamingDecoder {
<<interface>>
+setFragmentListener(listener)
}
class FragmentListener {
<<interface>>
+bytesRead(numBytes)
+onFragmentMessage(rawMessage)
}
class AbstractTripleClientStream {
<<abstract>>
#consumeBytes(numBytes)*
}
class Http2TripleClientStream {
+consumeBytes(numBytes)
-getHttp2Connection()
}
TripleHttp2LocalFlowController --|> DefaultHttp2LocalFlowController
NettyH2StreamChannel ..|> H2StreamChannel
Http2TripleClientStream --|> AbstractTripleClientStream
NettyH2StreamChannel --> TripleHttp2LocalFlowController : uses
Http2TripleClientStream --> TripleHttp2LocalFlowController : uses
```
---
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]