This is an automated email from the ASF dual-hosted git repository.

oxsean pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.3 by this push:
     new 07e1c2a9c7 fix mutiny race condition issue (#15662)
07e1c2a9c7 is described below

commit 07e1c2a9c723593e9078eb6e0099ad6482ca0c15
Author: Moritz Arena <[email protected]>
AuthorDate: Wed Sep 3 15:55:59 2025 +0800

    fix mutiny race condition issue (#15662)
---
 .../apache/dubbo/mutiny/MutinyClientCallsTest.java | 47 ++++++++++++++++------
 1 file changed, 35 insertions(+), 12 deletions(-)

diff --git 
a/dubbo-plugin/dubbo-mutiny/src/test/java/org/apache/dubbo/mutiny/MutinyClientCallsTest.java
 
b/dubbo-plugin/dubbo-mutiny/src/test/java/org/apache/dubbo/mutiny/MutinyClientCallsTest.java
index 4863b15640..22e48a13e4 100644
--- 
a/dubbo-plugin/dubbo-mutiny/src/test/java/org/apache/dubbo/mutiny/MutinyClientCallsTest.java
+++ 
b/dubbo-plugin/dubbo-mutiny/src/test/java/org/apache/dubbo/mutiny/MutinyClientCallsTest.java
@@ -23,6 +23,7 @@ import org.apache.dubbo.rpc.model.StubMethodDescriptor;
 import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;
 import org.apache.dubbo.rpc.stub.StubInvocationUtil;
 
+import java.time.Duration;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -31,6 +32,7 @@ import java.util.concurrent.atomic.AtomicReference;
 
 import io.smallrye.mutiny.Multi;
 import io.smallrye.mutiny.Uni;
+import io.smallrye.mutiny.helpers.test.AssertSubscriber;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 import org.mockito.MockedStatic;
@@ -91,6 +93,7 @@ public class MutinyClientCallsTest {
 
         try (MockedStatic<StubInvocationUtil> mocked = 
Mockito.mockStatic(StubInvocationUtil.class)) {
             AtomicBoolean stubCalled = new AtomicBoolean(false);
+            CountDownLatch subscribed = new CountDownLatch(1);
 
             mocked.when(() -> StubInvocationUtil.serverStreamCall(
                             Mockito.eq(invoker), Mockito.eq(method), 
Mockito.eq("testRequest"), Mockito.any()))
@@ -100,7 +103,9 @@ public class MutinyClientCallsTest {
 
                         CallStreamObserver<String> fakeSubscription = new 
CallStreamObserver<>() {
                             @Override
-                            public void request(int n) {}
+                            public void request(int n) {
+                                /* no-op */
+                            }
 
                             @Override
                             public void setCompression(String compression) {}
@@ -109,8 +114,8 @@ public class MutinyClientCallsTest {
                             public void disableAutoFlowControl() {}
 
                             @Override
-                            public void onNext(String value) {
-                                publisher.onNext(value);
+                            public void onNext(String v) {
+                                publisher.onNext(v);
                             }
 
                             @Override
@@ -123,13 +128,23 @@ public class MutinyClientCallsTest {
                                 publisher.onCompleted();
                             }
                         };
-
                         publisher.onSubscribe(fakeSubscription);
 
+                        // Wait for downstream subscription to complete before 
emitting data
                         new Thread(() -> {
-                                    publisher.onNext("item1");
-                                    publisher.onNext("item2");
-                                    publisher.onCompleted();
+                                    try {
+                                        if (subscribed.await(5, 
TimeUnit.SECONDS)) {
+                                            publisher.onNext("item1");
+                                            publisher.onNext("item2");
+                                            publisher.onCompleted();
+                                        } else {
+                                            publisher.onError(
+                                                    new 
IllegalStateException("Downstream subscription timeout"));
+                                        }
+                                    } catch (InterruptedException e) {
+                                        Thread.currentThread().interrupt();
+                                        publisher.onError(e);
+                                    }
                                 })
                                 .start();
 
@@ -137,15 +152,23 @@ public class MutinyClientCallsTest {
                     });
 
             Uni<String> uniRequest = Uni.createFrom().item("testRequest");
-
             Multi<String> multiResponse = MutinyClientCalls.oneToMany(invoker, 
uniRequest, method);
 
-            List<String> collectedItems =
-                    multiResponse.collect().asList().await().indefinitely();
+            // Use AssertSubscriber to ensure proper subscription timing
+            AssertSubscriber<String> subscriber = 
AssertSubscriber.create(Long.MAX_VALUE);
+            multiResponse.subscribe().withSubscriber(subscriber);
+
+            // Wait for subscription to be established
+            subscriber.awaitSubscription();
+            subscribed.countDown(); // Signal that data emission can begin
+
+            // Wait for completion
+            subscriber.awaitCompletion(Duration.ofSeconds(5));
 
+            // Verify results
             Assertions.assertTrue(stubCalled.get(), 
"StubInvocationUtil.serverStreamCall should be called");
-            Assertions.assertEquals(2, collectedItems.size());
-            Assertions.assertEquals(List.of("item1", "item2"), collectedItems);
+            Assertions.assertEquals(List.of("item1", "item2"), 
subscriber.getItems());
+            subscriber.assertCompleted();
         }
     }
 

Reply via email to