This is an automated email from the ASF dual-hosted git repository.
oxsean pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.3 by this push:
new 07e1c2a9c7 fix mutiny race condition issue (#15662)
07e1c2a9c7 is described below
commit 07e1c2a9c723593e9078eb6e0099ad6482ca0c15
Author: Moritz Arena <[email protected]>
AuthorDate: Wed Sep 3 15:55:59 2025 +0800
fix mutiny race condition issue (#15662)
---
.../apache/dubbo/mutiny/MutinyClientCallsTest.java | 47 ++++++++++++++++------
1 file changed, 35 insertions(+), 12 deletions(-)
diff --git
a/dubbo-plugin/dubbo-mutiny/src/test/java/org/apache/dubbo/mutiny/MutinyClientCallsTest.java
b/dubbo-plugin/dubbo-mutiny/src/test/java/org/apache/dubbo/mutiny/MutinyClientCallsTest.java
index 4863b15640..22e48a13e4 100644
---
a/dubbo-plugin/dubbo-mutiny/src/test/java/org/apache/dubbo/mutiny/MutinyClientCallsTest.java
+++
b/dubbo-plugin/dubbo-mutiny/src/test/java/org/apache/dubbo/mutiny/MutinyClientCallsTest.java
@@ -23,6 +23,7 @@ import org.apache.dubbo.rpc.model.StubMethodDescriptor;
import org.apache.dubbo.rpc.protocol.tri.observer.CallStreamObserver;
import org.apache.dubbo.rpc.stub.StubInvocationUtil;
+import java.time.Duration;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -31,6 +32,7 @@ import java.util.concurrent.atomic.AtomicReference;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
+import io.smallrye.mutiny.helpers.test.AssertSubscriber;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.MockedStatic;
@@ -91,6 +93,7 @@ public class MutinyClientCallsTest {
try (MockedStatic<StubInvocationUtil> mocked =
Mockito.mockStatic(StubInvocationUtil.class)) {
AtomicBoolean stubCalled = new AtomicBoolean(false);
+ CountDownLatch subscribed = new CountDownLatch(1);
mocked.when(() -> StubInvocationUtil.serverStreamCall(
Mockito.eq(invoker), Mockito.eq(method),
Mockito.eq("testRequest"), Mockito.any()))
@@ -100,7 +103,9 @@ public class MutinyClientCallsTest {
CallStreamObserver<String> fakeSubscription = new
CallStreamObserver<>() {
@Override
- public void request(int n) {}
+ public void request(int n) {
+ /* no-op */
+ }
@Override
public void setCompression(String compression) {}
@@ -109,8 +114,8 @@ public class MutinyClientCallsTest {
public void disableAutoFlowControl() {}
@Override
- public void onNext(String value) {
- publisher.onNext(value);
+ public void onNext(String v) {
+ publisher.onNext(v);
}
@Override
@@ -123,13 +128,23 @@ public class MutinyClientCallsTest {
publisher.onCompleted();
}
};
-
publisher.onSubscribe(fakeSubscription);
+ // Wait for downstream subscription to complete before
emitting data
new Thread(() -> {
- publisher.onNext("item1");
- publisher.onNext("item2");
- publisher.onCompleted();
+ try {
+ if (subscribed.await(5,
TimeUnit.SECONDS)) {
+ publisher.onNext("item1");
+ publisher.onNext("item2");
+ publisher.onCompleted();
+ } else {
+ publisher.onError(
+ new
IllegalStateException("Downstream subscription timeout"));
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ publisher.onError(e);
+ }
})
.start();
@@ -137,15 +152,23 @@ public class MutinyClientCallsTest {
});
Uni<String> uniRequest = Uni.createFrom().item("testRequest");
-
Multi<String> multiResponse = MutinyClientCalls.oneToMany(invoker,
uniRequest, method);
- List<String> collectedItems =
- multiResponse.collect().asList().await().indefinitely();
+ // Use AssertSubscriber to ensure proper subscription timing
+ AssertSubscriber<String> subscriber =
AssertSubscriber.create(Long.MAX_VALUE);
+ multiResponse.subscribe().withSubscriber(subscriber);
+
+ // Wait for subscription to be established
+ subscriber.awaitSubscription();
+ subscribed.countDown(); // Signal that data emission can begin
+
+ // Wait for completion
+ subscriber.awaitCompletion(Duration.ofSeconds(5));
+ // Verify results
Assertions.assertTrue(stubCalled.get(),
"StubInvocationUtil.serverStreamCall should be called");
- Assertions.assertEquals(2, collectedItems.size());
- Assertions.assertEquals(List.of("item1", "item2"), collectedItems);
+ Assertions.assertEquals(List.of("item1", "item2"),
subscriber.getItems());
+ subscriber.assertCompleted();
}
}