Copilot commented on code in PR #15957:
URL: https://github.com/apache/dubbo/pull/15957#discussion_r2653242494


##########
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/observer/CallStreamObserver.java:
##########
@@ -48,4 +48,11 @@ public interface CallStreamObserver<T> extends 
StreamObserver<T> {
      * specified.
      */
     void disableAutoFlowControl();
+
+    /**
+     * compatible method for gRPC

Review Comment:
   The comment "compatible method for gRPC" is unclear. It should explain what 
gRPC API this is compatible with, and why the compatibility is needed. For 
example, mention the specific gRPC class/interface this aligns with.
   ```suggestion
        * Compatibility method to mirror gRPC Java
        * {@code 
io.grpc.stub.CallStreamObserver#disableAutoInboundFlowControl()}.
        * <p>
        * This allows code written against gRPC's {@code CallStreamObserver} 
API to be
        * more easily reused with Dubbo by providing an equivalent entry point 
that
        * delegates to {@link #disableAutoFlowControl()}.
   ```



##########
dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/frame/RecordListener.java:
##########
@@ -21,6 +21,9 @@ public class RecordListener implements TriDecoder.Listener {
     int dataCount;
     boolean close;
 
+    @Override
+    public void bytesRead(int numBytes) {}

Review Comment:
   The empty bytesRead implementation should have a comment explaining why it's 
a no-op. For example, indicate that flow control is handled differently by the 
QUIC layer in HTTP/3, or reference the consumeBytes implementation below.
   ```suggestion
       public void bytesRead(int numBytes) {
           // Intentionally left blank in tests: byte-level flow control 
notifications
           // are not needed for these assertions, only raw message contents 
are recorded.
       }
   ```



##########
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/Http2TripleClientStream.java:
##########
@@ -72,4 +90,68 @@ public void handlerAdded(ChannelHandlerContext ctx) {
         writeQueue.enqueue(CreateStreamQueueCommand.create(bootstrap, 
streamChannelFuture));
         return streamChannelFuture;
     }
+
+    @Override
+    protected void consumeBytes(int numBytes) {
+        if (numBytes <= 0) {
+            return;
+        }
+
+        // todo The current implementation is not optimal, and alternative 
implementations should be considered.

Review Comment:
   The comment on line 100 contains a typo: "todo" should be "TODO" (all caps) 
to follow standard code annotation conventions. This makes it easier for IDEs 
and tools to recognize it as a task marker.
   ```suggestion
           // TODO The current implementation is not optimal, and alternative 
implementations should be considered.
   ```



##########
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/frame/TriDecoder.java:
##########
@@ -145,11 +145,23 @@ private void processHeader() {
      * Processes the GRPC message body, which depending on frame header flags 
may be compressed.
      */
     private void processBody() {
-        // There is no reliable way to get the uncompressed size per message 
when it's compressed,
-        // because the uncompressed bytes are provided through an InputStream 
whose total size is
-        // unknown until all bytes are read, and we don't know when it happens.
-        byte[] stream = compressedFlag ? getCompressedBody() : 
getUncompressedBody();
+        // Calculate total bytes read: header + payload (before decompression)
+        int totalBytesRead = HEADER_LENGTH + requiredLength;
 
+        byte[] stream;
+        try {
+            // There is no reliable way to get the uncompressed size per 
message when it's compressed,
+            // because the uncompressed bytes are provided through an 
InputStream whose total size is
+            // unknown until all bytes are read, and we don't know when it 
happens.
+            stream = compressedFlag ? getCompressedBody() : 
getUncompressedBody();
+        } finally {
+            // Notify listener about bytes read for flow control immediately 
after reading bytes
+            // This must be in finally block to ensure flow control works even 
if reading fails
+            // Following gRPC's pattern: bytesRead is called as soon as bytes 
are consumed from input
+            listener.bytesRead(totalBytesRead);
+        }
+

Review Comment:
   The bytesRead notification in the finally block will be called even if an 
exception occurs during getCompressedBody or getUncompressedBody. However, if 
these methods fail partway through reading the buffer, the totalBytesRead value 
may not accurately reflect the actual bytes consumed. Consider whether this is 
the intended behavior or if bytesRead should only be called on successful 
completion.
   ```suggestion
           // There is no reliable way to get the uncompressed size per message 
when it's compressed,
           // because the uncompressed bytes are provided through an 
InputStream whose total size is
           // unknown until all bytes are read, and we don't know when it 
happens.
           stream = compressedFlag ? getCompressedBody() : 
getUncompressedBody();
   
           // Notify listener about bytes read for flow control only after 
successfully reading bytes
           // Following gRPC's pattern: bytesRead is called as soon as bytes 
are consumed from input
           listener.bytesRead(totalBytesRead);
   ```



##########
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ClientCall.java:
##########
@@ -32,6 +32,13 @@ public interface ClientCall {
      */
     interface Listener {
 
+        /**
+         * Whether the response is streaming response.
+         *
+         * @return

Review Comment:
   Missing Javadoc comment for the ClientCall.Listener.streamingResponse() 
method. The method should have proper documentation explaining what it returns 
and when it should return true vs false.
   ```suggestion
            * Whether the response is a streaming response.
            * <p>
            * Implementations should return {@code true} when the server is 
expected to send
            * zero or more response messages over time (server-streaming or 
bidi-streaming
            * calls). Return {@code false} when at most a single response 
message is expected
            * from the server (unary or non-streaming calls).
            *
            * @return {@code true} if the server may send multiple response 
messages,
            *         {@code false} if at most one response message is expected
   ```



##########
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/TripleHttp2LocalFlowController.java:
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.protocol.tri.transport;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.handler.codec.http2.DefaultHttp2LocalFlowController;
+import io.netty.handler.codec.http2.Http2Connection;
+import io.netty.handler.codec.http2.Http2Exception;
+import io.netty.handler.codec.http2.Http2Stream;
+
+/**
+ * Custom HTTP/2 local flow controller for Triple protocol with manual flow 
control.
+ *
+ * <p>This flow controller works together with {@code 
Http2StreamChannelOption.AUTO_STREAM_FLOW_CONTROL = false}
+ * to enable manual flow control. The complete mechanism requires two parts:
+ *
+ * <h3>1. Disable Netty's automatic WINDOW_UPDATE (at Http2StreamChannel 
level)</h3>
+ * <p>Set {@code Http2StreamChannelOption.AUTO_STREAM_FLOW_CONTROL = false} 
when creating Http2StreamChannel.
+ * This prevents Netty's AbstractHttp2StreamChannel from automatically sending 
WINDOW_UPDATE frames.
+ *
+ * <h3>2. Manual flow control pattern (at Http2Connection level)</h3>
+ * <ol>
+ *   <li>Data is received and tracked via {@link #receiveFlowControlledFrame} 
- decreases window size</li>
+ *   <li>Application processes the data (decoding, business logic)</li>
+ *   <li>Application calls {@link #consumeBytes} to return processed bytes</li>
+ *   <li>WINDOW_UPDATE frame is sent when consumed bytes reach threshold 
(default: 50% of initial window)</li>
+ * </ol>
+ *
+ * <h3>Flow Control Call Chain</h3>
+ * <pre>
+ * Server: StreamingDecoder.bytesRead() → FragmentListener.bytesRead() → 
H2StreamChannel.consumeBytes()
+ *         → Http2LocalFlowController.consumeBytes() → WINDOW_UPDATE
+ * Client: TriDecoder.Listener.bytesRead() → 
AbstractTripleClientStream.consumeBytes()
+ *         → Http2LocalFlowController.consumeBytes() → WINDOW_UPDATE
+ * </pre>
+ *
+ * @see org.apache.dubbo.remoting.http12.h2.H2StreamChannel#consumeBytes(int)
+ * @see 
io.netty.handler.codec.http2.Http2StreamChannelOption#AUTO_STREAM_FLOW_CONTROL
+ */
+public class TripleHttp2LocalFlowController extends 
DefaultHttp2LocalFlowController {
+
+    /**
+     * Creates a new flow controller with custom windowUpdateRatio.
+     *
+     * @param connection        the HTTP/2 connection
+     * @param windowUpdateRatio the ratio of consumed bytes to initial window 
size at which
+     *                          WINDOW_UPDATE frames are sent. Must be between 
0 (exclusive) and 1 (inclusive).
+     *                          For example, 0.5 means WINDOW_UPDATE is sent 
when 50% of the
+     *                          initial window has been consumed.
+     */
+    public TripleHttp2LocalFlowController(Http2Connection connection, float 
windowUpdateRatio) {
+        super(connection, windowUpdateRatio, true);
+    }
+
+    @Override
+    public void receiveFlowControlledFrame(Http2Stream stream, ByteBuf data, 
int padding, boolean endOfStream)
+            throws Http2Exception {
+        super.receiveFlowControlledFrame(stream, data, padding, endOfStream);
+    }

Review Comment:
   The overridden receiveFlowControlledFrame method simply calls super without 
any additional logic. This override serves no purpose and should be removed to 
reduce unnecessary code.
   ```suggestion
   
   ```



##########
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/Http2TripleClientStream.java:
##########
@@ -72,4 +90,68 @@ public void handlerAdded(ChannelHandlerContext ctx) {
         writeQueue.enqueue(CreateStreamQueueCommand.create(bootstrap, 
streamChannelFuture));
         return streamChannelFuture;
     }
+
+    @Override
+    protected void consumeBytes(int numBytes) {
+        if (numBytes <= 0) {
+            return;
+        }
+
+        // todo The current implementation is not optimal, and alternative 
implementations should be considered.
+
+        Channel streamChannel = getStreamChannelFuture().getNow();
+        if (!(streamChannel instanceof Http2StreamChannel)) {
+            return;
+        }
+
+        Http2StreamChannel http2StreamChannel = (Http2StreamChannel) 
streamChannel;
+

Review Comment:
   Potential race condition: getStreamChannelFuture().getNow() may return null 
if the future hasn't completed yet. While the code handles the case where 
streamChannel is not an Http2StreamChannel, it doesn't explicitly handle the 
null case. Consider adding a null check or using a different approach to wait 
for the channel to be available.
   ```suggestion
           TripleStreamChannelFuture streamChannelFuture = 
getStreamChannelFuture();
           Channel streamChannel = streamChannelFuture.getNow();
           if (streamChannel == null) {
               // Stream channel not ready yet; defer consumeBytes until it 
becomes available.
               streamChannelFuture.addListener(future -> {
                   Channel ch = streamChannelFuture.getNow();
                   if (ch instanceof Http2StreamChannel) {
                       doConsumeBytes((Http2StreamChannel) ch, numBytes);
                   }
               });
               return;
           }
   
           if (!(streamChannel instanceof Http2StreamChannel)) {
               return;
           }
   
           Http2StreamChannel http2StreamChannel = (Http2StreamChannel) 
streamChannel;
           doConsumeBytes(http2StreamChannel, numBytes);
       }
   
       private void doConsumeBytes(Http2StreamChannel http2StreamChannel, int 
numBytes) {
   ```



##########
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ClientCall.java:
##########
@@ -32,6 +32,13 @@ public interface ClientCall {
      */
     interface Listener {
 
+        /**
+         * Whether the response is streaming response.
+         *
+         * @return

Review Comment:
   The Javadoc comment has an incomplete @return tag with no description. 
Either add the description or remove the @return tag if it follows a convention 
where boolean return types are self-explanatory.
   ```suggestion
            * @return true if the response is a streaming response
   ```



##########
dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h2/NettyH2StreamChannel.java:
##########
@@ -89,4 +103,40 @@ public CompletableFuture<Void> writeResetFrame(long 
errorCode) {
         
http2StreamChannel.write(resetFrame).addListener(nettyHttpChannelFutureListener);
         return nettyHttpChannelFutureListener;
     }
+
+    @Override
+    public void consumeBytes(int numBytes) throws Exception {
+        if (numBytes <= 0) {
+            return;
+        }
+
+        if (http2Connection == null) {
+            LOGGER.debug("Http2Connection not available, skip consumeBytes");
+            return;
+        }
+
+        Http2LocalFlowController localFlowController = 
http2Connection.local().flowController();
+
+        // Get the stream from connection using stream id
+        int streamId = http2StreamChannel.stream().id();

Review Comment:
   Potential NullPointerException: If http2StreamChannel.stream() returns null 
at line 121, the code will throw a NullPointerException. While there's a check 
for the stream being null after retrieving it from the connection (line 123), 
the stream().id() call at line 121 could fail first if stream() itself returns 
null. Consider checking if stream() is null before calling id().
   ```suggestion
           // Get the stream from Http2StreamChannel and then from connection 
using stream id
           Http2Stream channelStream = http2StreamChannel.stream();
           if (channelStream == null) {
               LOGGER.debug("Http2StreamChannel.stream() is null, skip 
consumeBytes");
               return;
           }
   
           int streamId = channelStream.id();
   ```



##########
dubbo-common/src/main/java/org/apache/dubbo/config/nested/TripleConfig.java:
##########
@@ -355,6 +367,19 @@ public void setMaxMessageSize(Integer maxMessageSize) {
         this.maxMessageSize = maxMessageSize;
     }
 
+    public Float getWindowUpdateRatio() {
+        return windowUpdateRatio;
+    }
+
+    @Parameter(excluded = true)
+    public float getWindowUpdateRatioOrDefault() {
+        return windowUpdateRatio == null ? DEFAULT_WINDOW_UPDATE_RATIO : 
windowUpdateRatio;
+    }
+
+    public void setWindowUpdateRatio(Float windowUpdateRatio) {

Review Comment:
   The validation for windowUpdateRatio is missing. The documentation states it 
must be between 0 (exclusive) and 1 (inclusive), but there's no validation in 
the setter to enforce this range. Consider adding validation to prevent invalid 
values.
   ```suggestion
       public void setWindowUpdateRatio(Float windowUpdateRatio) {
           if (windowUpdateRatio != null && (windowUpdateRatio <= 0.0f || 
windowUpdateRatio > 1.0f)) {
               throw new IllegalArgumentException("windowUpdateRatio must be > 
0 and <= 1, but was: " + windowUpdateRatio);
           }
   ```



##########
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ClientCall.java:
##########
@@ -89,6 +96,8 @@ interface Listener {
      */
     boolean isAutoRequest();
 

Review Comment:
   Missing Javadoc comment for the setAutoRequestWithInitial method. This new 
API method should have proper documentation explaining what it does, the 
purpose of the initialRequest parameter, and how it differs from setAutoRequest.
   ```suggestion
   
       /**
        * Enable auto request for this call with an initial number of messages 
to request.
        * <p>
        * This variant of auto request allows specifying how many response 
messages should be
        * requested from the server immediately when the call starts or auto 
request is enabled.
        * It is similar to {@link #setAutoRequest(boolean)} but also configures 
the initial
        * {@link #request(int) request} amount.
        *
        * @param initialRequest the initial number of messages to request from 
the server
        */
   ```



##########
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/observer/ClientCallToObserverAdapter.java:
##########
@@ -23,10 +23,12 @@
 public class ClientCallToObserverAdapter<T> extends 
CancelableStreamObserver<T> implements ClientStreamObserver<T> {
 
     private final ClientCall call;
+    private final boolean streamingResponse;
     private boolean terminated;
 
-    public ClientCallToObserverAdapter(ClientCall call) {
+    public ClientCallToObserverAdapter(ClientCall call, boolean 
streamingResponse) {
         this.call = call;
+        this.streamingResponse = streamingResponse;

Review Comment:
   The unused field streamingResponse in ClientCallToObserverAdapter is stored 
but never used. Either remove the field or implement the intended functionality 
that requires it.



##########
dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/LengthFieldStreamingDecoder.java:
##########
@@ -184,7 +184,20 @@ private void skipOffset(InputStream inputStream, int 
lengthFieldOffset) throws I
     }
 
     private void processBody() throws IOException {
-        byte[] rawMessage = readRawMessage(accumulate, requiredLength);
+        // Calculate total bytes read: header (offset + length field) + payload
+        int totalBytesRead = lengthFieldOffset + lengthFieldLength + 
requiredLength;
+
+        byte[] rawMessage;
+        try {
+            rawMessage = readRawMessage(accumulate, requiredLength);
+        } finally {
+            // Notify listener about bytes read for flow control immediately 
after reading bytes
+            // This must be in finally block to ensure flow control works even 
if reading fails
+            // Following gRPC's pattern: bytesRead is called as soon as bytes 
are consumed from input
+            listener.bytesRead(totalBytesRead);
+        }
+

Review Comment:
   The bytesRead notification in the finally block will be called even if an 
exception occurs during readRawMessage. However, if readRawMessage fails 
partway through reading the buffer, the totalBytesRead value may not accurately 
reflect the actual bytes consumed. Consider whether this is the intended 
behavior or if bytesRead should only be called on successful completion.
   ```suggestion
           byte[] rawMessage = readRawMessage(accumulate, requiredLength);
   
           // Notify listener about bytes read for flow control immediately 
after successfully reading bytes
           // Following gRPC's pattern: bytesRead is called as soon as bytes 
are consumed from input
           listener.bytesRead(totalBytesRead);
   ```



##########
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/Http2TripleClientStream.java:
##########
@@ -72,4 +90,68 @@ public void handlerAdded(ChannelHandlerContext ctx) {
         writeQueue.enqueue(CreateStreamQueueCommand.create(bootstrap, 
streamChannelFuture));
         return streamChannelFuture;
     }
+
+    @Override
+    protected void consumeBytes(int numBytes) {
+        if (numBytes <= 0) {
+            return;
+        }
+
+        // todo The current implementation is not optimal, and alternative 
implementations should be considered.
+
+        Channel streamChannel = getStreamChannelFuture().getNow();
+        if (!(streamChannel instanceof Http2StreamChannel)) {
+            return;
+        }
+
+        Http2StreamChannel http2StreamChannel = (Http2StreamChannel) 
streamChannel;
+
+        // Get Http2Connection from parent channel pipeline
+        Http2Connection http2Connection = getHttp2Connection();
+        if (http2Connection == null) {
+            LOGGER.debug("Http2Connection not available for flow control");
+            return;
+        }
+
+        Http2LocalFlowController localFlowController = 
http2Connection.local().flowController();
+        int streamId = http2StreamChannel.stream().id();

Review Comment:
   Potential NullPointerException: If getStreamChannelFuture().getNow() returns 
null or if http2StreamChannel.stream() returns null, the subsequent calls will 
throw NullPointerException. The code checks if streamChannel is an instance of 
Http2StreamChannel but doesn't check for null before the instanceof check. Add 
an explicit null check before line 103.



##########
dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h2/NettyH2StreamChannel.java:
##########
@@ -89,4 +103,40 @@ public CompletableFuture<Void> writeResetFrame(long 
errorCode) {
         
http2StreamChannel.write(resetFrame).addListener(nettyHttpChannelFutureListener);
         return nettyHttpChannelFutureListener;
     }
+
+    @Override
+    public void consumeBytes(int numBytes) throws Exception {
+        if (numBytes <= 0) {
+            return;
+        }
+
+        if (http2Connection == null) {
+            LOGGER.debug("Http2Connection not available, skip consumeBytes");
+            return;
+        }
+
+        Http2LocalFlowController localFlowController = 
http2Connection.local().flowController();
+
+        // Get the stream from connection using stream id
+        int streamId = http2StreamChannel.stream().id();
+        Http2Stream stream = http2Connection.stream(streamId);
+        if (stream == null) {
+            LOGGER.debug("Stream {} not found in connection, skip 
consumeBytes", streamId);
+            return;
+        }
+
+        // Consume bytes to trigger WINDOW_UPDATE frame
+        // This must be executed in the event loop thread
+        if (http2StreamChannel.eventLoop().inEventLoop()) {
+            localFlowController.consumeBytes(stream, numBytes);
+        } else {
+            http2StreamChannel.eventLoop().execute(() -> {
+                try {
+                    localFlowController.consumeBytes(stream, numBytes);
+                } catch (Exception e) {
+                    LOGGER.warn(PROTOCOL_FAILED_RESPONSE, "", "", "Failed to 
consumeBytes for stream " + streamId, e);
+                }
+            });

Review Comment:
   Inconsistent error handling: When in the event loop (line 131), exceptions 
from consumeBytes are propagated (declared in the method signature), but when 
executed asynchronously (line 135), exceptions are caught and logged. This 
inconsistency could lead to different behavior depending on which thread calls 
the method. Consider catching the exception in both cases or propagating it in 
both cases for consistency.
   ```suggestion
               http2StreamChannel.eventLoop().execute(() -> 
localFlowController.consumeBytes(stream, numBytes));
   ```



##########
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientStreamObserver.java:
##########
@@ -30,4 +30,6 @@ public interface ClientStreamObserver<T> extends 
CallStreamObserver<T> {
     default void disableAutoRequest() {
         disableAutoFlowControl();
     }
+

Review Comment:
   Missing Javadoc comment for the disableAutoRequestWithInitial method. This 
new API method should have proper documentation explaining what it does, the 
purpose of the request parameter, and how it differs from disableAutoRequest or 
disableAutoFlowControl.
   ```suggestion
   
       /**
        * Swaps to manual flow control and configures an initial number of 
messages to request
        * when the call starts.
        * <p>
        * This is similar to {@link #disableAutoRequest()} but allows 
specifying how many messages
        * should be requested immediately once the call is established, as if 
{@link #request(int)}
        * were invoked with the given {@code request} value after disabling 
automatic flow control.
        *
        * @param request the number of messages to request initially after auto 
flow control has been
        *                disabled; must be a non-negative number.
        */
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to