EarthChen commented on PR #15957:
URL: https://github.com/apache/dubbo/pull/15957#issuecomment-3707973285

   ## 应用层背压
   
   应用层背压机制允许应用程序在发送数据时感知底层传输层的状态,避免在传输层缓冲区满时继续发送数据导致内存溢出或性能下降。
   
   ### 核心 API
   
   | API | 说明 |
   |-----|------|
   | `isReady()` | 检查当前流是否可写,返回 `true` 表示可以发送更多消息 |
   | `setOnReadyHandler(Runnable)` | 设置回调,当流从不可写变为可写时触发 |
   
   ###  设计参考
   
   本实现完全对齐 gRPC Java 的设计模式:
   - `CallStreamObserver.isReady()` 和 
`CallStreamObserver.setOnReadyHandler(Runnable)`
   - `ClientCall.Listener.onReady()`
   
   ##  客户端实现
   
   ###  核心类关系
   
   ```
   
┌─────────────────────────────────────────────────────────────────────────────┐
   │                              用户代码                                        │
   │  CallStreamObserver<T> observer = ...;                                     
 │
   │  observer.setOnReadyHandler(() -> { /* 恢复发送 */ });                       │
   │  if (observer.isReady()) { observer.onNext(data); }                        
 │
   
└─────────────────────────────────────────────────────────────────────────────┘
                                         │
                                         ▼
   
┌─────────────────────────────────────────────────────────────────────────────┐
   │                    ClientCallToObserverAdapter                             
  │
   │  - 实现 CallStreamObserver 接口                                              │
   │  - 存储 onReadyHandler                                                       
│
   │  - isReady() 委托给 ClientCall.isReady()                                     │
   
└─────────────────────────────────────────────────────────────────────────────┘
                                         │
                       ┌─────────────────┴─────────────────┐
                       ▼                                   ▼
   ┌──────────────────────────────────┐    
┌──────────────────────────────────────┐
   │     TripleClientCall             │    │  
ObserverToClientCallListenerAdapter │
   │  - 实现 ClientCall               │    │  - 实现 ClientCall.Listener          │
   │  - 实现 ClientStream.Listener    │    │  - 持有 ClientCallToObserverAdapter  │
   │  - isReady() 委托给 stream       │    │  - onReady() 触发 onReadyHandler     │
   └──────────────────────────────────┘    
└──────────────────────────────────────┘
                       │
                       ▼
   
┌─────────────────────────────────────────────────────────────────────────────┐
   │                    AbstractTripleClientStream                              
  │
   │  - 实现 ClientStream                                                         
│
   │  - isReady() 返回 channel.isWritable()                                       
│
   │  - onWritabilityChanged() 调用 listener.onReady()                            
│
   
└─────────────────────────────────────────────────────────────────────────────┘
                       │
                       ▼
   
┌─────────────────────────────────────────────────────────────────────────────┐
   │                 Netty Http2StreamChannel                                   
  │
   │  - isWritable() 底层可写性检查                                                │
   │  - channelWritabilityChanged 事件                                            
│
   
└─────────────────────────────────────────────────────────────────────────────┘
   ```
   
   ###  isReady() 调用链
   
   ```
   用户: observer.isReady()
           │
           ▼
   ClientCallToObserverAdapter.isReady()
           │ return call.isReady()
           ▼
   TripleClientCall.isReady()
           │ 检查 canceled/done 状态
           │ return stream.isReady()
           ▼
   AbstractTripleClientStream.isReady()
           │ Channel channel = streamChannelFuture.getNow()
           │ return channel.isWritable()
           ▼
   Netty: Http2StreamChannel.isWritable()
   ```
   
   **代码实现:**
   
   ```java
   // ClientCallToObserverAdapter.java
   @Override
   public boolean isReady() {
       return call.isReady();
   }
   
   // TripleClientCall.java
   @Override
   public boolean isReady() {
       if (canceled) return false;
       if (done) return false;
       return stream.isReady();
   }
   
   // AbstractTripleClientStream.java
   @Override
   public boolean isReady() {
       Channel channel = streamChannelFuture.getNow();
       if (channel == null) return false;
       return channel.isWritable();
   }
   ```
   
   ###  onReadyHandler 触发链
   
   当 Netty Channel 从不可写变为可写时,触发以下调用链:
   
   ```
   Netty: channelWritabilityChanged 事件
           │
           ▼
   TripleHttp2ClientResponseHandler.channelWritabilityChanged(ctx)
           │ transportListener.onWritabilityChanged()
           ▼
   ClientTransportListener.onWritabilityChanged()  [H2TransportListener 实现]
           │ AbstractTripleClientStream.this.onWritabilityChanged()
           ▼
   AbstractTripleClientStream.onWritabilityChanged()
           │ 检查 channel.isWritable()
           │ listener.onReady() [同步调用]
           ▼
   TripleClientCall.onReady()  [ClientStream.Listener 实现]
           │ executor.execute(() -> listener.onReady()) [异步执行]
           ▼
   ObserverToClientCallListenerAdapter.onReady()  [ClientCall.Listener 实现]
           │ requestAdapter.getOnReadyHandler().run()
           ▼
   用户设置的 Runnable 被执行 [在业务 executor 线程中]
   ```
   
   **代码实现:**
   
   ```java
   // TripleHttp2ClientResponseHandler.java
   @Override
   public void channelWritabilityChanged(ChannelHandlerContext ctx) throws 
Exception {
       transportListener.onWritabilityChanged();
       super.channelWritabilityChanged(ctx);
   }
   
   // AbstractTripleClientStream.ClientTransportListener
   @Override
   public void onWritabilityChanged() {
       AbstractTripleClientStream.this.onWritabilityChanged();
   }
   
   // AbstractTripleClientStream.java
   protected void onWritabilityChanged() {
       Channel channel = streamChannelFuture.getNow();
       if (channel != null && channel.isWritable()) {
           // 同步调用,由 TripleClientCall.onReady() 负责异步执行
           listener.onReady();
       }
   }
   
   // TripleClientCall.java
   @Override
   public void onReady() {
       if (listener == null) {
           return;
       }
       // ObserverToClientCallListenerAdapter.onReady() triggers the 
onReadyHandler
       executor.execute(() -> {
           try {
               listener.onReady();
           } catch (Throwable t) {
               LOGGER.warn(PROTOCOL_STREAM_LISTENER, "", "", 
                   "Error executing listener.onReady()", t);
           }
       });
   }
   
   // ObserverToClientCallListenerAdapter.java
   @Override
   public void onReady() {
       if (requestAdapter == null) return;
       Runnable handler = requestAdapter.getOnReadyHandler();
       if (handler == null) return;
       handler.run();
   }
   ```
   
   ###  setOnReadyHandler 设置流程
   
   ```
   用户: observer.setOnReadyHandler(runnable)
           │
           ▼
   ClientCallToObserverAdapter.setOnReadyHandler(runnable)
           │ this.onReadyHandler = runnable  // 存储在本地
           │
           └──────────────────────────────────────────────────────
                                                                 │
   TripleInvoker.streamCall() 中建立关联:                         │
           │                                                     │
           ▼                                                     │
   listener.setRequestAdapter(adapter)                           │
           │                                                     │
           ▼                                                     │
   ObserverToClientCallListenerAdapter 持有 requestAdapter 引用 ──┘
           │
           ▼ (当 onReady 被调用时)
   requestAdapter.getOnReadyHandler().run()
   ```
   
   **代码实现:**
   
   ```java
   // ClientCallToObserverAdapter.java
   private Runnable onReadyHandler;
   
   @Override
   public void setOnReadyHandler(Runnable onReadyHandler) {
       this.onReadyHandler = onReadyHandler;
   }
   
   public Runnable getOnReadyHandler() {
       return onReadyHandler;
   }
   
   // TripleInvoker.java
   StreamObserver<Object> streamCall(...) {
       ObserverToClientCallListenerAdapter listener = 
           new ObserverToClientCallListenerAdapter(responseObserver);
       StreamObserver<Object> streamObserver = call.start(metadata, listener);
       
       // 建立关联,让 listener 能够访问 adapter 的 onReadyHandler
       if (streamObserver instanceof ClientCallToObserverAdapter) {
           listener.setRequestAdapter((ClientCallToObserverAdapter<Object>) 
streamObserver);
       }
       // ...
   }
   ```
   
   ##  服务端实现
   
   服务端的背压机制通过 `H2StreamChannel` 和 `Http2ServerChannelObserver` 实现。
   
   ###  核心类关系
   
   ```
   
┌─────────────────────────────────────────────────────────────────────────────┐
   │                    Http2ServerChannelObserver                              
  │
   │  - 实现 CallStreamObserver                                                   
│
   │  - isReady() 委托给 H2StreamChannel.isReady()                                │
   │  - setOnReadyHandler() 设置回调                                              │
   
└─────────────────────────────────────────────────────────────────────────────┘
                                         │
                                         ▼
   
┌─────────────────────────────────────────────────────────────────────────────┐
   │                       NettyH2StreamChannel                                 
  │
   │  - 实现 H2StreamChannel                                                      
│
   │  - isReady() 返回 http2StreamChannel.isWritable()                            
│
   │  - onWritabilityChanged() 触发 onReadyHandler                                
│
   
└─────────────────────────────────────────────────────────────────────────────┘
                                         │
                                         ▼
   
┌─────────────────────────────────────────────────────────────────────────────┐
   │                 Netty Http2StreamChannel                                   
  │
   │  - isWritable() 底层可写性检查                                                │
   
└─────────────────────────────────────────────────────────────────────────────┘
   ```
   
   ###  服务端 onWritabilityChanged 触发链
   
   ```
   Netty: channelWritabilityChanged 事件
           │
           ▼
   NettyHttp2FrameHandler.channelWritabilityChanged(ctx)
           │ transportListener.onWritabilityChanged()
           ▼
   Http2TransportListener.onWritabilityChanged()
           │ 由具体实现处理
           ▼
   触发服务端的背压回调
   ```
   
   
   ##  使用示例
   
   ###  客户端使用背压
   
   ```java
   // 获取请求 StreamObserver
   StreamObserver<Request> requestObserver = 
stub.biStreamCall(responseObserver);
   
   // 转换为 CallStreamObserver 以使用背压 API
   CallStreamObserver<Request> callObserver = (CallStreamObserver<Request>) 
requestObserver;
   
   // 禁用自动流控
   callObserver.disableAutoFlowControl();
   
   // 设置 ready 回调
   callObserver.setOnReadyHandler(() -> {
       // 当流变为可写时,恢复发送数据
       while (callObserver.isReady() && hasMoreData()) {
           callObserver.onNext(getNextData());
       }
   });
   
   // 初始发送(如果可写)
   while (callObserver.isReady() && hasMoreData()) {
       callObserver.onNext(getNextData());
   }
   ```
   
   ###  服务端使用背压
   
   ```java
   @Override
   public StreamObserver<Request> biStreamCall(StreamObserver<Response> 
responseObserver) {
       // 转换为 ServerCallStreamObserver
       ServerCallStreamObserver<Response> serverObserver = 
           (ServerCallStreamObserver<Response>) responseObserver;
       
       // 设置 ready 回调
       serverObserver.setOnReadyHandler(() -> {
           while (serverObserver.isReady() && hasMoreResponses()) {
               serverObserver.onNext(getNextResponse());
           }
       });
       
       return new StreamObserver<Request>() {
           @Override
           public void onNext(Request request) {
               // 处理请求
           }
           // ...
       };
   }
   ```
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to