Copilot commented on code in PR #16004:
URL: https://github.com/apache/dubbo/pull/16004#discussion_r2693496578


##########
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractTripleClientStream.java:
##########
@@ -254,9 +263,31 @@ public boolean isReady() {
      * asynchronously triggering all necessary callbacks through its executor.
      */
     protected void onWritabilityChanged() {
-        Channel channel = streamChannelFuture.getNow();
-        if (channel != null && channel.isWritable()) {
-            // Synchronously call listener.onReady(), which will use executor 
to run the callback
+        notifyOnReady(false);
+    }
+
+    /**
+     * Called by InitOnReadyQueueCommand to trigger the initial onReady 
notification.
+     */
+    public void triggerInitialOnReady() {
+        notifyOnReady(true);
+    }
+
+    /**
+     * notify listener when stream becomes ready
+     *
+     * @param forceNotify if true, always trigger onReady (for initial 
notification);
+     *                    if false, only trigger when state changes from "not 
ready" to "ready"
+     */
+    private void notifyOnReady(boolean forceNotify) {
+        boolean wasReady = lastReadyState;
+        boolean isNowReady = isReady();
+        lastReadyState = isNowReady;
+
+        // Trigger onReady if:
+        // 1. forceNotify is true (initial notification, spurious is OK), or
+        // 2. state changes from "not ready" to "ready"
+        if (forceNotify || (!wasReady && isNowReady)) {
             listener.onReady();
         }
     }

Review Comment:
   This change introduces backpressure race condition handling logic, but there 
are no new tests to verify the race condition is actually fixed. Tests should 
be added to verify that concurrent calls to notifyOnReady correctly track state 
transitions and don't result in duplicate or lost notifications, especially 
when onWritabilityChanged and sendMessage callbacks race.



##########
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractTripleClientStream.java:
##########
@@ -86,6 +86,12 @@ public abstract class AbstractTripleClientStream extends 
AbstractStream implemen
 
     private boolean isReturnTriException = false;
 
+    /**
+     * Tracks the last known ready state for detecting state transitions.
+     * when the state changes from "not ready" to "ready".

Review Comment:
   The JavaDoc comment is incomplete. The first sentence ends abruptly without 
completing the thought: "when the state changes from 'not ready' to 'ready'." 
This should be integrated into a complete sentence explaining what the field 
tracks.
   ```suggestion
        * Tracks the last known ready state to detect when the state changes 
from "not ready" to "ready".
   ```



##########
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractTripleClientStream.java:
##########
@@ -254,9 +263,31 @@ public boolean isReady() {
      * asynchronously triggering all necessary callbacks through its executor.
      */
     protected void onWritabilityChanged() {
-        Channel channel = streamChannelFuture.getNow();
-        if (channel != null && channel.isWritable()) {
-            // Synchronously call listener.onReady(), which will use executor 
to run the callback
+        notifyOnReady(false);
+    }
+
+    /**
+     * Called by InitOnReadyQueueCommand to trigger the initial onReady 
notification.
+     */
+    public void triggerInitialOnReady() {
+        notifyOnReady(true);
+    }
+
+    /**
+     * notify listener when stream becomes ready
+     *
+     * @param forceNotify if true, always trigger onReady (for initial 
notification);
+     *                    if false, only trigger when state changes from "not 
ready" to "ready"
+     */
+    private void notifyOnReady(boolean forceNotify) {

Review Comment:
   The notifyOnReady method has a race condition in the read-modify-write 
sequence. Multiple threads could read the same value of lastReadyState, compute 
isReady(), and then update lastReadyState, leading to lost state transitions or 
duplicate notifications. Since this method can be called from different threads 
(the EventLoop thread via onWritabilityChanged, the sendMessage callback 
thread, and the InitOnReadyQueueCommand), the entire sequence should be 
synchronized or use atomic compare-and-set operations to ensure thread safety.
   ```suggestion
       private synchronized void notifyOnReady(boolean forceNotify) {
   ```



##########
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/AbstractTripleClientStream.java:
##########
@@ -254,9 +263,31 @@ public boolean isReady() {
      * asynchronously triggering all necessary callbacks through its executor.
      */
     protected void onWritabilityChanged() {
-        Channel channel = streamChannelFuture.getNow();
-        if (channel != null && channel.isWritable()) {
-            // Synchronously call listener.onReady(), which will use executor 
to run the callback
+        notifyOnReady(false);
+    }
+
+    /**
+     * Called by InitOnReadyQueueCommand to trigger the initial onReady 
notification.
+     */
+    public void triggerInitialOnReady() {

Review Comment:
   The triggerInitialOnReady method should be package-private or protected 
rather than public since it's only called by InitOnReadyQueueCommand within the 
same package. Public visibility exposes internal stream state management that 
should not be accessible to external clients.
   ```suggestion
       void triggerInitialOnReady() {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to