This is an automated email from the ASF dual-hosted git repository.
zrlw pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.3 by this push:
new 5aa764c07e fix tri backpressure race condition (#16004)
5aa764c07e is described below
commit 5aa764c07e917c89849e17b5602a06caf9aeaf3a
Author: earthchen <[email protected]>
AuthorDate: Fri Jan 16 16:47:10 2026 +0800
fix tri backpressure race condition (#16004)
* fix tri backpressure race condition
* add business ready check
* fix: enhance onReady trigger mechanism to prevent race conditions
* fix: improve onReady notification mechanism to prevent race conditions
* fix
* mvn spotless:apply
* fix: resolve race condition in onReady notification by using isReady
method
* Update
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractTripleClientStream.java
Co-authored-by: Copilot <[email protected]>
* Update
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractTripleClientStream.java
Co-authored-by: Copilot <[email protected]>
* Update
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractTripleClientStream.java
Co-authored-by: Copilot <[email protected]>
* Revert "Update
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractTripleClientStream.java"
This reverts commit 8dd204e82b6150af32061a29fd8706b33c9aeb86.
---------
Co-authored-by: Copilot <[email protected]>
---
.../rpc/protocol/tri/call/TripleClientCall.java | 6 +++-
.../tri/command/InitOnReadyQueueCommand.java | 19 ++++++-----
.../tri/stream/AbstractTripleClientStream.java | 38 +++++++++++++++++++---
3 files changed, 49 insertions(+), 14 deletions(-)
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/TripleClientCall.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/TripleClientCall.java
index 12e56e2ee0..c4de49e0f1 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/TripleClientCall.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/TripleClientCall.java
@@ -168,7 +168,11 @@ public class TripleClientCall implements ClientCall,
ClientStream.Listener {
if (listener == null) {
return;
}
- // ObserverToClientCallListenerAdapter.onReady() triggers the
onReadyHandler
+ // ObserverToClientCallListenerAdapter.onReady() triggers the
onReadyHandler.
+ // Note: We do NOT check isReady() here because of the async dispatch
model.
+ // The handler is always called (following gRPC's "spurious
notifications" semantics),
+ // and it should check isReady() internally via while(isReady()) {
send(); }.
+ // Subsequent channelWritabilityChanged events will trigger onReady()
again if needed.
executor.execute(() -> {
try {
listener.onReady();
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/command/InitOnReadyQueueCommand.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/command/InitOnReadyQueueCommand.java
index 61505532d0..df7159c4f1 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/command/InitOnReadyQueueCommand.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/command/InitOnReadyQueueCommand.java
@@ -16,7 +16,7 @@
*/
package org.apache.dubbo.rpc.protocol.tri.command;
-import org.apache.dubbo.rpc.protocol.tri.stream.ClientStream;
+import org.apache.dubbo.rpc.protocol.tri.stream.AbstractTripleClientStream;
import org.apache.dubbo.rpc.protocol.tri.stream.TripleStreamChannelFuture;
import io.netty.channel.Channel;
@@ -36,18 +36,18 @@ public class InitOnReadyQueueCommand extends QueuedCommand {
private final TripleStreamChannelFuture streamChannelFuture;
- private final ClientStream.Listener listener;
+ private final AbstractTripleClientStream stream;
- private InitOnReadyQueueCommand(TripleStreamChannelFuture
streamChannelFuture, ClientStream.Listener listener) {
+ private InitOnReadyQueueCommand(TripleStreamChannelFuture
streamChannelFuture, AbstractTripleClientStream stream) {
this.streamChannelFuture = streamChannelFuture;
- this.listener = listener;
+ this.stream = stream;
this.promise(streamChannelFuture.getParentChannel().newPromise());
this.channel(streamChannelFuture.getParentChannel());
}
public static InitOnReadyQueueCommand create(
- TripleStreamChannelFuture streamChannelFuture,
ClientStream.Listener listener) {
- return new InitOnReadyQueueCommand(streamChannelFuture, listener);
+ TripleStreamChannelFuture streamChannelFuture,
AbstractTripleClientStream stream) {
+ return new InitOnReadyQueueCommand(streamChannelFuture, stream);
}
@Override
@@ -59,9 +59,10 @@ public class InitOnReadyQueueCommand extends QueuedCommand {
public void run(Channel channel) {
// Work in I/O thread, after CreateStreamQueueCommand has completed
Channel streamChannel = streamChannelFuture.getNow();
- if (streamChannel != null && streamChannel.isWritable()) {
- // Trigger initial onReady to allow application to start sending.
- listener.onReady();
+ if (streamChannel != null) {
+ // Trigger initial onReady through the stream
+ // update lastReadyState and notify the listener
+ stream.triggerInitialOnReady();
}
}
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractTripleClientStream.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractTripleClientStream.java
index d377a774c0..6c40a3f9a4 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractTripleClientStream.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractTripleClientStream.java
@@ -86,6 +86,11 @@ public abstract class AbstractTripleClientStream extends
AbstractStream implemen
private boolean isReturnTriException = false;
+ /**
+ * Tracks the last known ready state to detect when the state changes from
"not ready" to "ready".
+ */
+ private volatile boolean lastReadyState = false;
+
protected AbstractTripleClientStream(
FrameworkModel frameworkModel,
Executor executor,
@@ -122,7 +127,7 @@ public abstract class AbstractTripleClientStream extends
AbstractStream implemen
* This is necessary because onReady is only triggered by
channelWritabilityChanged,
* which won't fire if the channel is always writable from creation.
*/
-
writeQueue.enqueue(InitOnReadyQueueCommand.create(tripleStreamChannelFuture,
listener));
+
writeQueue.enqueue(InitOnReadyQueueCommand.create(tripleStreamChannelFuture,
this));
return tripleStreamChannelFuture;
}
@@ -192,6 +197,9 @@ public abstract class AbstractTripleClientStream extends
AbstractStream implemen
.withDescription("Client write message failed")
.withCause(future.cause()));
transportException(future.cause());
+ } else {
+ // After successful write, check if we need to trigger onReady
+ notifyOnReady(false);
}
});
}
@@ -254,9 +262,31 @@ public abstract class AbstractTripleClientStream extends
AbstractStream implemen
* asynchronously triggering all necessary callbacks through its executor.
*/
protected void onWritabilityChanged() {
- Channel channel = streamChannelFuture.getNow();
- if (channel != null && channel.isWritable()) {
- // Synchronously call listener.onReady(), which will use executor
to run the callback
+ notifyOnReady(false);
+ }
+
+ /**
+ * Called by InitOnReadyQueueCommand to trigger the initial onReady
notification.
+ */
+ public void triggerInitialOnReady() {
+ notifyOnReady(true);
+ }
+
+ /**
+ * notify listener when stream becomes ready
+ *
+ * @param forceNotify if true, always trigger onReady (for initial
notification);
+ * if false, only trigger when state changes from "not
ready" to "ready"
+ */
+ private synchronized void notifyOnReady(boolean forceNotify) {
+ boolean wasReady = lastReadyState;
+ boolean isNowReady = isReady();
+ lastReadyState = isNowReady;
+
+ // Trigger onReady if:
+ // 1. forceNotify is true (initial notification, spurious is OK), or
+ // 2. state changes from "not ready" to "ready"
+ if (forceNotify || (!wasReady && isNowReady)) {
listener.onReady();
}
}