This is an automated email from the ASF dual-hosted git repository.

guohao pushed a commit to branch 3.1
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.1 by this push:
     new 16fc7b55ef Remove processing streamObserver in 
TripleInvoker#invokeUnary (#10398)
16fc7b55ef is described below

commit 16fc7b55ef20f97d2b5e7b983d4d10eb9afc4c82
Author: Kunshuai Zhu <[email protected]>
AuthorDate: Wed Aug 3 14:08:40 2022 +0800

    Remove processing streamObserver in TripleInvoker#invokeUnary (#10398)
    
    * Remove processing streamObserver in TripleInvoker#invokeUnary
    
    * remove unaryCall with streamObserver in StubInvocationUtil
    
    * add Compatible with unaryCall
    
    * add unit test
---
 .../apache/dubbo/rpc/stub/StubInvocationUtil.java  |  8 +++++-
 .../dubbo/rpc/stub/StubInvocationUtilTest.java     |  9 ++----
 .../dubbo/rpc/protocol/tri/TripleInvoker.java      | 32 ++++------------------
 3 files changed, 15 insertions(+), 34 deletions(-)

diff --git 
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/stub/StubInvocationUtil.java
 
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/stub/StubInvocationUtil.java
index 4e88fb780e..cb5232744f 100644
--- 
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/stub/StubInvocationUtil.java
+++ 
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/stub/StubInvocationUtil.java
@@ -33,7 +33,13 @@ public class StubInvocationUtil {
 
     public static <T, R> void unaryCall(Invoker<?> invoker, MethodDescriptor 
method, T request,
         StreamObserver<R> responseObserver) {
-        call(invoker, method, new Object[]{request, responseObserver});
+        try {
+            Object res = unaryCall(invoker, method, request);
+            responseObserver.onNext((R) res);
+        } catch (Exception e) {
+            responseObserver.onError(e);
+        }
+        responseObserver.onCompleted();
     }
 
     public static <T, R> StreamObserver<T> biOrClientStreamCall(Invoker<?> 
invoker,
diff --git 
a/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/stub/StubInvocationUtilTest.java
 
b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/stub/StubInvocationUtilTest.java
index 476911f5f9..e5196f3237 100644
--- 
a/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/stub/StubInvocationUtilTest.java
+++ 
b/dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/stub/StubInvocationUtilTest.java
@@ -136,13 +136,8 @@ class StubInvocationUtilTest {
         Result result = Mockito.mock(Result.class);
         String response = "response";
         when(invoker.invoke(any(Invocation.class)))
-            .then(invocationOnMock -> {
-                Invocation invocation = (Invocation) 
invocationOnMock.getArguments()[0];
-                StreamObserver<Object> observer = (StreamObserver<Object>) 
invocation.getArguments()[1];
-                observer.onNext(response);
-                observer.onCompleted();
-                return result;
-            });
+            .then(invocationOnMock -> result);
+        when(result.recreate()).thenReturn(response);
         MethodDescriptor method = Mockito.mock(MethodDescriptor.class);
         when(method.getParameterClasses())
             .thenReturn(new Class[]{String.class});
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java
index c6a534d333..ce2ee44066 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java
@@ -196,35 +196,15 @@ public class TripleInvoker<T> extends AbstractInvoker<T> {
         RequestMetadata request = createRequest(methodDescriptor, invocation, 
timeout);
 
         final Object pureArgument;
-        if (methodDescriptor.getParameterClasses().length == 2
-            && methodDescriptor.getParameterClasses()[1].isAssignableFrom(
-            StreamObserver.class)) {
-            StreamObserver<Object> observer = (StreamObserver<Object>) 
invocation.getArguments()[1];
-            future.whenComplete((r, t) -> {
-                if (t != null) {
-                    observer.onError(t);
-                    return;
-                }
-                if (r.hasException()) {
-                    observer.onError(r.getException());
-                    return;
-                }
-                observer.onNext(r.getValue());
-                observer.onCompleted();
-            });
+
+        if (methodDescriptor instanceof StubMethodDescriptor) {
             pureArgument = invocation.getArguments()[0];
-            result = new AsyncRpcResult(CompletableFuture.completedFuture(new 
AppResponse()),
-                invocation);
         } else {
-            if (methodDescriptor instanceof StubMethodDescriptor) {
-                pureArgument = invocation.getArguments()[0];
-            } else {
-                pureArgument = invocation.getArguments();
-            }
-            result = new AsyncRpcResult(future, invocation);
-            result.setExecutor(callbackExecutor);
-            FutureContext.getContext().setCompatibleFuture(future);
+            pureArgument = invocation.getArguments();
         }
+        result = new AsyncRpcResult(future, invocation);
+        FutureContext.getContext().setCompatibleFuture(future);
+        result.setExecutor(callbackExecutor);
         ClientCall.Listener callListener = new UnaryClientCallListener(future);
 
         final StreamObserver<Object> requestObserver = call.start(request, 
callListener);

Reply via email to