Copilot commented on code in PR #16028:
URL: https://github.com/apache/dubbo/pull/16028#discussion_r2710917819
##########
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractTripleClientStream.java:
##########
@@ -194,20 +201,64 @@ public ChannelFuture sendMessage(byte[] message, int
compressFlag) {
if (!checkResult.isSuccess()) {
return checkResult;
}
+
+ final int messageSize = message.length;
+ onSendingBytes(messageSize);
+
final DataQueueCommand cmd =
DataQueueCommand.create(streamChannelFuture, message, false, compressFlag);
return this.writeQueue.enqueueFuture(cmd,
parent.eventLoop()).addListener(future -> {
if (!future.isSuccess()) {
+ rollbackSendingBytes(messageSize);
cancelByLocal(TriRpcStatus.INTERNAL
.withDescription("Client write message failed")
.withCause(future.cause()));
transportException(future.cause());
} else {
- // After successful write, check if we need to trigger onReady
- notifyOnReady(false);
+ onSentBytes(messageSize);
}
});
}
+ /**
+ * Called before bytes are sent to track pending bytes.
+ *
+ * @param numBytes the number of bytes about to be sent
+ */
+ protected void onSendingBytes(int numBytes) {
+ numSentBytesQueued.addAndGet(numBytes);
+ }
+
+ /**
+ * Called when sending fails to rollback the pending bytes count.
+ *
+ * @param numBytes the number of bytes to rollback
+ */
+ protected void rollbackSendingBytes(int numBytes) {
+ numSentBytesQueued.addAndGet(-numBytes);
+ }
+
+ /**
+ * Called when bytes have been successfully sent to the remote endpoint.
+ *
+ * @param numBytes the number of bytes that were sent
+ */
+ protected void onSentBytes(int numBytes) {
+ long oldValue = numSentBytesQueued.getAndAdd(-numBytes);
+ long newValue = oldValue - numBytes;
Review Comment:
The calculation `long newValue = oldValue - numBytes;` is redundant since
`getAndAdd(-numBytes)` already returns the old value, and the new value can be
directly obtained from the atomic operation result. This duplicated calculation
could lead to confusion. Consider using the atomic operation's return value
more directly or adding a comment explaining why both calculations are needed.
```suggestion
long newValue = numSentBytesQueued.addAndGet(-numBytes);
long oldValue = newValue + numBytes;
```
##########
dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2ServerChannelObserver.java:
##########
@@ -67,20 +111,92 @@ public void setOnReadyHandler(Runnable onReadyHandler) {
}
/**
- * Called when the channel writability changes.
- * Triggers the onReadyHandler if the channel is now writable.
+ * Called by the transport layer when the underlying channel's writability
changes.
+ * <p>
+ * This serves as an additional trigger point for notifying the {@code
onReadyHandler}
+ * when the channel becomes writable again. The actual ready state is
still determined
+ * by the byte counting mechanism in {@link #isReady()}.
*/
public void onWritabilityChanged() {
- Runnable handler = this.onReadyHandler;
- if (handler != null && isReady()) {
- handler.run();
+ if (isReady()) {
+ notifyOnReady();
}
}
public void setStreamingDecoder(StreamingDecoder streamingDecoder) {
this.streamingDecoder = streamingDecoder;
}
+ /**
+ * Override to add byte counting for backpressure support.
+ */
+ @Override
+ protected CompletableFuture<Void> sendMessage(HttpOutputMessage message)
throws Throwable {
+ if (message == null) {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ int messageSize = message.messageSize();
+ onSendingBytes(messageSize);
+
+ CompletableFuture<Void> future = super.sendMessage(message);
+
+ future.whenComplete((v, t) -> {
+ if (t == null) {
+ onSentBytes(messageSize);
+ } else {
+ rollbackSendingBytes(messageSize);
+ }
+ });
+
+ return future;
+ }
+
+ /**
+ * Called before bytes are sent to track pending bytes.
+ */
+ protected void onSendingBytes(int numBytes) {
+ numSentBytesQueued.addAndGet(numBytes);
+ }
+
+ /**
+ * Called when sending fails to rollback the pending bytes count.
+ */
+ protected void rollbackSendingBytes(int numBytes) {
+ numSentBytesQueued.addAndGet(-numBytes);
+ }
+
+ /**
+ * Called when bytes have been successfully sent to the remote endpoint.
+ */
+ protected void onSentBytes(int numBytes) {
+ long oldValue = numSentBytesQueued.getAndAdd(-numBytes);
+ long newValue = oldValue - numBytes;
+ // Trigger onReady when transitioning from "not ready" to "ready"
+ if (oldValue >= ON_READY_THRESHOLD && newValue < ON_READY_THRESHOLD) {
Review Comment:
The calculation `long newValue = oldValue - numBytes;` is redundant since
`getAndAdd(-numBytes)` already returns the old value, and the new value can be
directly obtained from the atomic operation result. This duplicated calculation
could lead to confusion. Consider using the atomic operation's return value
more directly or adding a comment explaining why both calculations are needed.
```suggestion
// Trigger onReady when transitioning from "not ready" to "ready"
if (oldValue >= ON_READY_THRESHOLD && oldValue - numBytes <
ON_READY_THRESHOLD) {
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]