Copilot commented on code in PR #15957:
URL: https://github.com/apache/dubbo/pull/15957#discussion_r2653242494
##########
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/observer/CallStreamObserver.java:
##########
@@ -48,4 +48,11 @@ public interface CallStreamObserver<T> extends
StreamObserver<T> {
* specified.
*/
void disableAutoFlowControl();
+
+ /**
+ * compatible method for gRPC
Review Comment:
The comment "compatible method for gRPC" is unclear. It should explain what
gRPC API this is compatible with, and why the compatibility is needed. For
example, mention the specific gRPC class/interface this aligns with.
```suggestion
* Compatibility method to mirror gRPC Java
* {@code
io.grpc.stub.CallStreamObserver#disableAutoInboundFlowControl()}.
* <p>
* This allows code written against gRPC's {@code CallStreamObserver}
API to be
* more easily reused with Dubbo by providing an equivalent entry point
that
* delegates to {@link #disableAutoFlowControl()}.
```
##########
dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/frame/RecordListener.java:
##########
@@ -21,6 +21,9 @@ public class RecordListener implements TriDecoder.Listener {
int dataCount;
boolean close;
+ @Override
+ public void bytesRead(int numBytes) {}
Review Comment:
The empty bytesRead implementation should have a comment explaining why it's
a no-op. For example, indicate that flow control is handled differently by the
QUIC layer in HTTP/3, or reference the consumeBytes implementation below.
```suggestion
public void bytesRead(int numBytes) {
// Intentionally left blank in tests: byte-level flow control
notifications
// are not needed for these assertions, only raw message contents
are recorded.
}
```
##########
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/Http2TripleClientStream.java:
##########
@@ -72,4 +90,68 @@ public void handlerAdded(ChannelHandlerContext ctx) {
writeQueue.enqueue(CreateStreamQueueCommand.create(bootstrap,
streamChannelFuture));
return streamChannelFuture;
}
+
+ @Override
+ protected void consumeBytes(int numBytes) {
+ if (numBytes <= 0) {
+ return;
+ }
+
+ // todo The current implementation is not optimal, and alternative
implementations should be considered.
Review Comment:
The comment on line 100 contains a typo: "todo" should be "TODO" (all caps)
to follow standard code annotation conventions. This makes it easier for IDEs
and tools to recognize it as a task marker.
```suggestion
// TODO The current implementation is not optimal, and alternative
implementations should be considered.
```
##########
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/frame/TriDecoder.java:
##########
@@ -145,11 +145,23 @@ private void processHeader() {
* Processes the GRPC message body, which depending on frame header flags
may be compressed.
*/
private void processBody() {
- // There is no reliable way to get the uncompressed size per message
when it's compressed,
- // because the uncompressed bytes are provided through an InputStream
whose total size is
- // unknown until all bytes are read, and we don't know when it happens.
- byte[] stream = compressedFlag ? getCompressedBody() :
getUncompressedBody();
+ // Calculate total bytes read: header + payload (before decompression)
+ int totalBytesRead = HEADER_LENGTH + requiredLength;
+ byte[] stream;
+ try {
+ // There is no reliable way to get the uncompressed size per
message when it's compressed,
+ // because the uncompressed bytes are provided through an
InputStream whose total size is
+ // unknown until all bytes are read, and we don't know when it
happens.
+ stream = compressedFlag ? getCompressedBody() :
getUncompressedBody();
+ } finally {
+ // Notify listener about bytes read for flow control immediately
after reading bytes
+ // This must be in finally block to ensure flow control works even
if reading fails
+ // Following gRPC's pattern: bytesRead is called as soon as bytes
are consumed from input
+ listener.bytesRead(totalBytesRead);
+ }
+
Review Comment:
The bytesRead notification in the finally block will be called even if an
exception occurs during getCompressedBody or getUncompressedBody. However, if
these methods fail partway through reading the buffer, the totalBytesRead value
may not accurately reflect the actual bytes consumed. Consider whether this is
the intended behavior or if bytesRead should only be called on successful
completion.
```suggestion
// There is no reliable way to get the uncompressed size per message
when it's compressed,
// because the uncompressed bytes are provided through an
InputStream whose total size is
// unknown until all bytes are read, and we don't know when it
happens.
stream = compressedFlag ? getCompressedBody() :
getUncompressedBody();
// Notify listener about bytes read for flow control only after
successfully reading bytes
// Following gRPC's pattern: bytesRead is called as soon as bytes
are consumed from input
listener.bytesRead(totalBytesRead);
```
##########
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ClientCall.java:
##########
@@ -32,6 +32,13 @@ public interface ClientCall {
*/
interface Listener {
+ /**
+ * Whether the response is streaming response.
+ *
+ * @return
Review Comment:
Missing Javadoc comment for the ClientCall.Listener.streamingResponse()
method. The method should have proper documentation explaining what it returns
and when it should return true vs false.
```suggestion
* Whether the response is a streaming response.
* <p>
* Implementations should return {@code true} when the server is
expected to send
* zero or more response messages over time (server-streaming or
bidi-streaming
* calls). Return {@code false} when at most a single response
message is expected
* from the server (unary or non-streaming calls).
*
* @return {@code true} if the server may send multiple response
messages,
* {@code false} if at most one response message is expected
```
##########
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/TripleHttp2LocalFlowController.java:
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.protocol.tri.transport;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.handler.codec.http2.DefaultHttp2LocalFlowController;
+import io.netty.handler.codec.http2.Http2Connection;
+import io.netty.handler.codec.http2.Http2Exception;
+import io.netty.handler.codec.http2.Http2Stream;
+
+/**
+ * Custom HTTP/2 local flow controller for Triple protocol with manual flow
control.
+ *
+ * <p>This flow controller works together with {@code
Http2StreamChannelOption.AUTO_STREAM_FLOW_CONTROL = false}
+ * to enable manual flow control. The complete mechanism requires two parts:
+ *
+ * <h3>1. Disable Netty's automatic WINDOW_UPDATE (at Http2StreamChannel
level)</h3>
+ * <p>Set {@code Http2StreamChannelOption.AUTO_STREAM_FLOW_CONTROL = false}
when creating Http2StreamChannel.
+ * This prevents Netty's AbstractHttp2StreamChannel from automatically sending
WINDOW_UPDATE frames.
+ *
+ * <h3>2. Manual flow control pattern (at Http2Connection level)</h3>
+ * <ol>
+ * <li>Data is received and tracked via {@link #receiveFlowControlledFrame}
- decreases window size</li>
+ * <li>Application processes the data (decoding, business logic)</li>
+ * <li>Application calls {@link #consumeBytes} to return processed bytes</li>
+ * <li>WINDOW_UPDATE frame is sent when consumed bytes reach threshold
(default: 50% of initial window)</li>
+ * </ol>
+ *
+ * <h3>Flow Control Call Chain</h3>
+ * <pre>
+ * Server: StreamingDecoder.bytesRead() → FragmentListener.bytesRead() →
H2StreamChannel.consumeBytes()
+ * → Http2LocalFlowController.consumeBytes() → WINDOW_UPDATE
+ * Client: TriDecoder.Listener.bytesRead() →
AbstractTripleClientStream.consumeBytes()
+ * → Http2LocalFlowController.consumeBytes() → WINDOW_UPDATE
+ * </pre>
+ *
+ * @see org.apache.dubbo.remoting.http12.h2.H2StreamChannel#consumeBytes(int)
+ * @see
io.netty.handler.codec.http2.Http2StreamChannelOption#AUTO_STREAM_FLOW_CONTROL
+ */
+public class TripleHttp2LocalFlowController extends
DefaultHttp2LocalFlowController {
+
+ /**
+ * Creates a new flow controller with custom windowUpdateRatio.
+ *
+ * @param connection the HTTP/2 connection
+ * @param windowUpdateRatio the ratio of consumed bytes to initial window
size at which
+ * WINDOW_UPDATE frames are sent. Must be between
0 (exclusive) and 1 (inclusive).
+ * For example, 0.5 means WINDOW_UPDATE is sent
when 50% of the
+ * initial window has been consumed.
+ */
+ public TripleHttp2LocalFlowController(Http2Connection connection, float
windowUpdateRatio) {
+ super(connection, windowUpdateRatio, true);
+ }
+
+ @Override
+ public void receiveFlowControlledFrame(Http2Stream stream, ByteBuf data,
int padding, boolean endOfStream)
+ throws Http2Exception {
+ super.receiveFlowControlledFrame(stream, data, padding, endOfStream);
+ }
Review Comment:
The overridden receiveFlowControlledFrame method simply calls super without
any additional logic. This override serves no purpose and should be removed to
reduce unnecessary code.
```suggestion
```
##########
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/Http2TripleClientStream.java:
##########
@@ -72,4 +90,68 @@ public void handlerAdded(ChannelHandlerContext ctx) {
writeQueue.enqueue(CreateStreamQueueCommand.create(bootstrap,
streamChannelFuture));
return streamChannelFuture;
}
+
+ @Override
+ protected void consumeBytes(int numBytes) {
+ if (numBytes <= 0) {
+ return;
+ }
+
+ // todo The current implementation is not optimal, and alternative
implementations should be considered.
+
+ Channel streamChannel = getStreamChannelFuture().getNow();
+ if (!(streamChannel instanceof Http2StreamChannel)) {
+ return;
+ }
+
+ Http2StreamChannel http2StreamChannel = (Http2StreamChannel)
streamChannel;
+
Review Comment:
Potential race condition: getStreamChannelFuture().getNow() may return null
if the future hasn't completed yet. While the code handles the case where
streamChannel is not an Http2StreamChannel, it doesn't explicitly handle the
null case. Consider adding a null check or using a different approach to wait
for the channel to be available.
```suggestion
TripleStreamChannelFuture streamChannelFuture =
getStreamChannelFuture();
Channel streamChannel = streamChannelFuture.getNow();
if (streamChannel == null) {
// Stream channel not ready yet; defer consumeBytes until it
becomes available.
streamChannelFuture.addListener(future -> {
Channel ch = streamChannelFuture.getNow();
if (ch instanceof Http2StreamChannel) {
doConsumeBytes((Http2StreamChannel) ch, numBytes);
}
});
return;
}
if (!(streamChannel instanceof Http2StreamChannel)) {
return;
}
Http2StreamChannel http2StreamChannel = (Http2StreamChannel)
streamChannel;
doConsumeBytes(http2StreamChannel, numBytes);
}
private void doConsumeBytes(Http2StreamChannel http2StreamChannel, int
numBytes) {
```
##########
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ClientCall.java:
##########
@@ -32,6 +32,13 @@ public interface ClientCall {
*/
interface Listener {
+ /**
+ * Whether the response is streaming response.
+ *
+ * @return
Review Comment:
The Javadoc comment has an incomplete @return tag with no description.
Either add the description or remove the @return tag if it follows a convention
where boolean return types are self-explanatory.
```suggestion
* @return true if the response is a streaming response
```
##########
dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h2/NettyH2StreamChannel.java:
##########
@@ -89,4 +103,40 @@ public CompletableFuture<Void> writeResetFrame(long
errorCode) {
http2StreamChannel.write(resetFrame).addListener(nettyHttpChannelFutureListener);
return nettyHttpChannelFutureListener;
}
+
+ @Override
+ public void consumeBytes(int numBytes) throws Exception {
+ if (numBytes <= 0) {
+ return;
+ }
+
+ if (http2Connection == null) {
+ LOGGER.debug("Http2Connection not available, skip consumeBytes");
+ return;
+ }
+
+ Http2LocalFlowController localFlowController =
http2Connection.local().flowController();
+
+ // Get the stream from connection using stream id
+ int streamId = http2StreamChannel.stream().id();
Review Comment:
Potential NullPointerException: If http2StreamChannel.stream() returns null
at line 121, the code will throw a NullPointerException. While there's a check
for the stream being null after retrieving it from the connection (line 123),
the stream().id() call at line 121 could fail first if stream() itself returns
null. Consider checking if stream() is null before calling id().
```suggestion
// Get the stream from Http2StreamChannel and then from connection
using stream id
Http2Stream channelStream = http2StreamChannel.stream();
if (channelStream == null) {
LOGGER.debug("Http2StreamChannel.stream() is null, skip
consumeBytes");
return;
}
int streamId = channelStream.id();
```
##########
dubbo-common/src/main/java/org/apache/dubbo/config/nested/TripleConfig.java:
##########
@@ -355,6 +367,19 @@ public void setMaxMessageSize(Integer maxMessageSize) {
this.maxMessageSize = maxMessageSize;
}
+ public Float getWindowUpdateRatio() {
+ return windowUpdateRatio;
+ }
+
+ @Parameter(excluded = true)
+ public float getWindowUpdateRatioOrDefault() {
+ return windowUpdateRatio == null ? DEFAULT_WINDOW_UPDATE_RATIO :
windowUpdateRatio;
+ }
+
+ public void setWindowUpdateRatio(Float windowUpdateRatio) {
Review Comment:
The validation for windowUpdateRatio is missing. The documentation states it
must be between 0 (exclusive) and 1 (inclusive), but there's no validation in
the setter to enforce this range. Consider adding validation to prevent invalid
values.
```suggestion
public void setWindowUpdateRatio(Float windowUpdateRatio) {
if (windowUpdateRatio != null && (windowUpdateRatio <= 0.0f ||
windowUpdateRatio > 1.0f)) {
throw new IllegalArgumentException("windowUpdateRatio must be >
0 and <= 1, but was: " + windowUpdateRatio);
}
```
##########
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ClientCall.java:
##########
@@ -89,6 +96,8 @@ interface Listener {
*/
boolean isAutoRequest();
Review Comment:
Missing Javadoc comment for the setAutoRequestWithInitial method. This new
API method should have proper documentation explaining what it does, the
purpose of the initialRequest parameter, and how it differs from setAutoRequest.
```suggestion
/**
* Enable auto request for this call with an initial number of messages
to request.
* <p>
* This variant of auto request allows specifying how many response
messages should be
* requested from the server immediately when the call starts or auto
request is enabled.
* It is similar to {@link #setAutoRequest(boolean)} but also configures
the initial
* {@link #request(int) request} amount.
*
* @param initialRequest the initial number of messages to request from
the server
*/
```
##########
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/observer/ClientCallToObserverAdapter.java:
##########
@@ -23,10 +23,12 @@
public class ClientCallToObserverAdapter<T> extends
CancelableStreamObserver<T> implements ClientStreamObserver<T> {
private final ClientCall call;
+ private final boolean streamingResponse;
private boolean terminated;
- public ClientCallToObserverAdapter(ClientCall call) {
+ public ClientCallToObserverAdapter(ClientCall call, boolean
streamingResponse) {
this.call = call;
+ this.streamingResponse = streamingResponse;
Review Comment:
The unused field streamingResponse in ClientCallToObserverAdapter is stored
but never used. Either remove the field or implement the intended functionality
that requires it.
##########
dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/message/LengthFieldStreamingDecoder.java:
##########
@@ -184,7 +184,20 @@ private void skipOffset(InputStream inputStream, int
lengthFieldOffset) throws I
}
private void processBody() throws IOException {
- byte[] rawMessage = readRawMessage(accumulate, requiredLength);
+ // Calculate total bytes read: header (offset + length field) + payload
+ int totalBytesRead = lengthFieldOffset + lengthFieldLength +
requiredLength;
+
+ byte[] rawMessage;
+ try {
+ rawMessage = readRawMessage(accumulate, requiredLength);
+ } finally {
+ // Notify listener about bytes read for flow control immediately
after reading bytes
+ // This must be in finally block to ensure flow control works even
if reading fails
+ // Following gRPC's pattern: bytesRead is called as soon as bytes
are consumed from input
+ listener.bytesRead(totalBytesRead);
+ }
+
Review Comment:
The bytesRead notification in the finally block will be called even if an
exception occurs during readRawMessage. However, if readRawMessage fails
partway through reading the buffer, the totalBytesRead value may not accurately
reflect the actual bytes consumed. Consider whether this is the intended
behavior or if bytesRead should only be called on successful completion.
```suggestion
byte[] rawMessage = readRawMessage(accumulate, requiredLength);
// Notify listener about bytes read for flow control immediately
after successfully reading bytes
// Following gRPC's pattern: bytesRead is called as soon as bytes
are consumed from input
listener.bytesRead(totalBytesRead);
```
##########
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/Http2TripleClientStream.java:
##########
@@ -72,4 +90,68 @@ public void handlerAdded(ChannelHandlerContext ctx) {
writeQueue.enqueue(CreateStreamQueueCommand.create(bootstrap,
streamChannelFuture));
return streamChannelFuture;
}
+
+ @Override
+ protected void consumeBytes(int numBytes) {
+ if (numBytes <= 0) {
+ return;
+ }
+
+ // todo The current implementation is not optimal, and alternative
implementations should be considered.
+
+ Channel streamChannel = getStreamChannelFuture().getNow();
+ if (!(streamChannel instanceof Http2StreamChannel)) {
+ return;
+ }
+
+ Http2StreamChannel http2StreamChannel = (Http2StreamChannel)
streamChannel;
+
+ // Get Http2Connection from parent channel pipeline
+ Http2Connection http2Connection = getHttp2Connection();
+ if (http2Connection == null) {
+ LOGGER.debug("Http2Connection not available for flow control");
+ return;
+ }
+
+ Http2LocalFlowController localFlowController =
http2Connection.local().flowController();
+ int streamId = http2StreamChannel.stream().id();
Review Comment:
Potential NullPointerException: If getStreamChannelFuture().getNow() returns
null or if http2StreamChannel.stream() returns null, the subsequent calls will
throw NullPointerException. The code checks if streamChannel is an instance of
Http2StreamChannel but doesn't check for null before the instanceof check. Add
an explicit null check before line 103.
##########
dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/netty4/h2/NettyH2StreamChannel.java:
##########
@@ -89,4 +103,40 @@ public CompletableFuture<Void> writeResetFrame(long
errorCode) {
http2StreamChannel.write(resetFrame).addListener(nettyHttpChannelFutureListener);
return nettyHttpChannelFutureListener;
}
+
+ @Override
+ public void consumeBytes(int numBytes) throws Exception {
+ if (numBytes <= 0) {
+ return;
+ }
+
+ if (http2Connection == null) {
+ LOGGER.debug("Http2Connection not available, skip consumeBytes");
+ return;
+ }
+
+ Http2LocalFlowController localFlowController =
http2Connection.local().flowController();
+
+ // Get the stream from connection using stream id
+ int streamId = http2StreamChannel.stream().id();
+ Http2Stream stream = http2Connection.stream(streamId);
+ if (stream == null) {
+ LOGGER.debug("Stream {} not found in connection, skip
consumeBytes", streamId);
+ return;
+ }
+
+ // Consume bytes to trigger WINDOW_UPDATE frame
+ // This must be executed in the event loop thread
+ if (http2StreamChannel.eventLoop().inEventLoop()) {
+ localFlowController.consumeBytes(stream, numBytes);
+ } else {
+ http2StreamChannel.eventLoop().execute(() -> {
+ try {
+ localFlowController.consumeBytes(stream, numBytes);
+ } catch (Exception e) {
+ LOGGER.warn(PROTOCOL_FAILED_RESPONSE, "", "", "Failed to
consumeBytes for stream " + streamId, e);
+ }
+ });
Review Comment:
Inconsistent error handling: When in the event loop (line 131), exceptions
from consumeBytes are propagated (declared in the method signature), but when
executed asynchronously (line 135), exceptions are caught and logged. This
inconsistency could lead to different behavior depending on which thread calls
the method. Consider catching the exception in both cases or propagating it in
both cases for consistency.
```suggestion
http2StreamChannel.eventLoop().execute(() ->
localFlowController.consumeBytes(stream, numBytes));
```
##########
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientStreamObserver.java:
##########
@@ -30,4 +30,6 @@ public interface ClientStreamObserver<T> extends
CallStreamObserver<T> {
default void disableAutoRequest() {
disableAutoFlowControl();
}
+
Review Comment:
Missing Javadoc comment for the disableAutoRequestWithInitial method. This
new API method should have proper documentation explaining what it does, the
purpose of the request parameter, and how it differs from disableAutoRequest or
disableAutoFlowControl.
```suggestion
/**
* Swaps to manual flow control and configures an initial number of
messages to request
* when the call starts.
* <p>
* This is similar to {@link #disableAutoRequest()} but allows
specifying how many messages
* should be requested immediately once the call is established, as if
{@link #request(int)}
* were invoked with the given {@code request} value after disabling
automatic flow control.
*
* @param request the number of messages to request initially after auto
flow control has been
* disabled; must be a non-negative number.
*/
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]