This is an automated email from the ASF dual-hosted git repository.
guohao pushed a commit to branch 3.1
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.1 by this push:
new 16fc7b55ef Remove processing streamObserver in
TripleInvoker#invokeUnary (#10398)
16fc7b55ef is described below
commit 16fc7b55ef20f97d2b5e7b983d4d10eb9afc4c82
Author: Kunshuai Zhu <[email protected]>
AuthorDate: Wed Aug 3 14:08:40 2022 +0800
Remove processing streamObserver in TripleInvoker#invokeUnary (#10398)
* Remove processing streamObserver in TripleInvoker#invokeUnary
* remove unaryCall with streamObserver in StubInvocationUtil
* add Compatible with unaryCall
* add unit test
---
.../apache/dubbo/rpc/stub/StubInvocationUtil.java | 8 +++++-
.../dubbo/rpc/stub/StubInvocationUtilTest.java | 9 ++----
.../dubbo/rpc/protocol/tri/TripleInvoker.java | 32 ++++------------------
3 files changed, 15 insertions(+), 34 deletions(-)
diff --git
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/stub/StubInvocationUtil.java
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/stub/StubInvocationUtil.java
index 4e88fb780e..cb5232744f 100644
---
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/stub/StubInvocationUtil.java
+++
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/stub/StubInvocationUtil.java
@@ -33,7 +33,13 @@ public class StubInvocationUtil {
public static <T, R> void unaryCall(Invoker<?> invoker, MethodDescriptor
method, T request,
StreamObserver<R> responseObserver) {
- call(invoker, method, new Object[]{request, responseObserver});
+ try {
+ Object res = unaryCall(invoker, method, request);
+ responseObserver.onNext((R) res);
+ } catch (Exception e) {
+ responseObserver.onError(e);
+ }
+ responseObserver.onCompleted();
}
public static <T, R> StreamObserver<T> biOrClientStreamCall(Invoker<?>
invoker,
diff --git
a/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/stub/StubInvocationUtilTest.java
b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/stub/StubInvocationUtilTest.java
index 476911f5f9..e5196f3237 100644
---
a/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/stub/StubInvocationUtilTest.java
+++
b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/stub/StubInvocationUtilTest.java
@@ -136,13 +136,8 @@ class StubInvocationUtilTest {
Result result = Mockito.mock(Result.class);
String response = "response";
when(invoker.invoke(any(Invocation.class)))
- .then(invocationOnMock -> {
- Invocation invocation = (Invocation)
invocationOnMock.getArguments()[0];
- StreamObserver<Object> observer = (StreamObserver<Object>)
invocation.getArguments()[1];
- observer.onNext(response);
- observer.onCompleted();
- return result;
- });
+ .then(invocationOnMock -> result);
+ when(result.recreate()).thenReturn(response);
MethodDescriptor method = Mockito.mock(MethodDescriptor.class);
when(method.getParameterClasses())
.thenReturn(new Class[]{String.class});
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java
index c6a534d333..ce2ee44066 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java
@@ -196,35 +196,15 @@ public class TripleInvoker<T> extends AbstractInvoker<T> {
RequestMetadata request = createRequest(methodDescriptor, invocation,
timeout);
final Object pureArgument;
- if (methodDescriptor.getParameterClasses().length == 2
- && methodDescriptor.getParameterClasses()[1].isAssignableFrom(
- StreamObserver.class)) {
- StreamObserver<Object> observer = (StreamObserver<Object>)
invocation.getArguments()[1];
- future.whenComplete((r, t) -> {
- if (t != null) {
- observer.onError(t);
- return;
- }
- if (r.hasException()) {
- observer.onError(r.getException());
- return;
- }
- observer.onNext(r.getValue());
- observer.onCompleted();
- });
+
+ if (methodDescriptor instanceof StubMethodDescriptor) {
pureArgument = invocation.getArguments()[0];
- result = new AsyncRpcResult(CompletableFuture.completedFuture(new
AppResponse()),
- invocation);
} else {
- if (methodDescriptor instanceof StubMethodDescriptor) {
- pureArgument = invocation.getArguments()[0];
- } else {
- pureArgument = invocation.getArguments();
- }
- result = new AsyncRpcResult(future, invocation);
- result.setExecutor(callbackExecutor);
- FutureContext.getContext().setCompatibleFuture(future);
+ pureArgument = invocation.getArguments();
}
+ result = new AsyncRpcResult(future, invocation);
+ FutureContext.getContext().setCompatibleFuture(future);
+ result.setExecutor(callbackExecutor);
ClientCall.Listener callListener = new UnaryClientCallListener(future);
final StreamObserver<Object> requestObserver = call.start(request,
callListener);