This is an automated email from the ASF dual-hosted git repository.

earthchen pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.2 by this push:
     new 7b9e65ed06 Revert "optimize performance. decode in user thread 
(#11879)" (#11917)
7b9e65ed06 is described below

commit 7b9e65ed064d2df2deb1ff510470d9151a0f200b
Author: earthchen <[email protected]>
AuthorDate: Fri Mar 24 23:35:13 2023 +0800

    Revert "optimize performance. decode in user thread (#11879)" (#11917)
    
    This reverts commit 95865b04616c7d8b4d6b20ac4232b9499006d7eb.
---
 .../dubbo/common/function/ThrowableSupplier.java   | 27 ---------------
 .../apache/dubbo/common/utils/ExecutorUtil.java    |  6 ----
 .../dubbo/rpc/protocol/tri/DeadlineFuture.java     | 11 +++---
 .../dubbo/rpc/protocol/tri/TripleInvoker.java      |  3 --
 .../dubbo/rpc/protocol/tri/call/ClientCall.java    | 10 ++----
 .../call/ObserverToClientCallListenerAdapter.java  | 12 +++----
 .../rpc/protocol/tri/call/TripleClientCall.java    | 21 +++++-------
 .../protocol/tri/call/TripleMessageProducer.java   | 39 ----------------------
 .../protocol/tri/call/UnaryClientCallListener.java | 33 ++++++++----------
 .../dubbo/rpc/protocol/tri/DeadlineFutureTest.java |  2 +-
 10 files changed, 32 insertions(+), 132 deletions(-)

diff --git 
a/dubbo-common/src/main/java/org/apache/dubbo/common/function/ThrowableSupplier.java
 
b/dubbo-common/src/main/java/org/apache/dubbo/common/function/ThrowableSupplier.java
deleted file mode 100644
index 84befaf9f7..0000000000
--- 
a/dubbo-common/src/main/java/org/apache/dubbo/common/function/ThrowableSupplier.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dubbo.common.function;
-
-public interface ThrowableSupplier<T> {
-
-    /**
-     * Gets a result.
-     *
-     * @return a result
-     */
-    T get() throws Throwable;
-}
diff --git 
a/dubbo-common/src/main/java/org/apache/dubbo/common/utils/ExecutorUtil.java 
b/dubbo-common/src/main/java/org/apache/dubbo/common/utils/ExecutorUtil.java
index 714c8ae4fa..6c0bc148a1 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/utils/ExecutorUtil.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/utils/ExecutorUtil.java
@@ -37,8 +37,6 @@ public class ExecutorUtil {
         new LinkedBlockingQueue<Runnable>(100),
         new NamedThreadFactory("Close-ExecutorService-Timer", true));
 
-    private static final Executor DIRECT_EXECUTOR = Runnable::run;
-
     public static boolean isTerminated(Executor executor) {
         if (executor instanceof ExecutorService) {
             if (((ExecutorService) executor).isTerminated()) {
@@ -137,8 +135,4 @@ public class ExecutorUtil {
             future.cancel(true);
         }
     }
-
-    public static Executor directExecutor() {
-        return DIRECT_EXECUTOR;
-    }
 }
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/DeadlineFuture.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/DeadlineFuture.java
index 34097752a8..f2985759fd 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/DeadlineFuture.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/DeadlineFuture.java
@@ -36,7 +36,6 @@ import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
-import java.util.function.Supplier;
 
 public class DeadlineFuture extends CompletableFuture<AppResponse> {
 
@@ -81,7 +80,7 @@ public class DeadlineFuture extends 
CompletableFuture<AppResponse> {
         return future;
     }
 
-    public void received(TriRpcStatus status, Supplier<AppResponse> 
appResponse) {
+    public void received(TriRpcStatus status, AppResponse appResponse) {
         if (status.code != TriRpcStatus.Code.DEADLINE_EXCEEDED) {
             // decrease Time
             if (!timeoutTask.isCancelled()) {
@@ -89,13 +88,11 @@ public class DeadlineFuture extends 
CompletableFuture<AppResponse> {
             }
         }
         if (getExecutor() != null) {
-            getExecutor().execute(() -> doReceived(status, appResponse.get()));
+            getExecutor().execute(() -> doReceived(status, appResponse));
         } else {
-            doReceived(status, appResponse.get());
+            doReceived(status, appResponse);
         }
-    }
-
-    private static final GlobalResourceInitializer<Timer> TIME_OUT_TIMER = new 
GlobalResourceInitializer<>(
+    }    private static final GlobalResourceInitializer<Timer> TIME_OUT_TIMER 
= new GlobalResourceInitializer<>(
         () -> new HashedWheelTimer(new 
NamedThreadFactory("dubbo-future-timeout", true), 30,
             TimeUnit.MILLISECONDS), DeadlineFuture::destroy);
 
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java
index d3e7ec4f48..6c11a1061a 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java
@@ -24,7 +24,6 @@ import org.apache.dubbo.common.constants.CommonConstants;
 import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
 import org.apache.dubbo.common.logger.LoggerFactory;
 import org.apache.dubbo.common.stream.StreamObserver;
-import org.apache.dubbo.common.utils.ExecutorUtil;
 import org.apache.dubbo.remoting.api.connection.AbstractConnectionClient;
 import org.apache.dubbo.rpc.AppResponse;
 import org.apache.dubbo.rpc.AsyncRpcResult;
@@ -133,8 +132,6 @@ public class TripleInvoker<T> extends AbstractInvoker<T> {
         try {
             switch (methodDescriptor.getRpcType()) {
                 case UNARY:
-                    call = new TripleClientCall(connectionClient, 
ExecutorUtil.directExecutor(),
-                        getUrl().getOrDefaultFrameworkModel(), writeQueue);
                     result = invokeUnary(methodDescriptor, invocation, call);
                     break;
                 case SERVER_STREAM:
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ClientCall.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ClientCall.java
index a54e7b8df1..4936f54409 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ClientCall.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ClientCall.java
@@ -43,9 +43,9 @@ public interface ClientCall {
         /**
          * Callback when message received.
          *
-         * @param messageProducer message producer
+         * @param message message received
          */
-        void onMessage(MessageProducer messageProducer);
+        void onMessage(Object message);
 
         /**
          * Callback when call is finished.
@@ -110,10 +110,4 @@ public interface ClientCall {
      */
     void setCompression(String compression);
 
-    interface MessageProducer {
-
-        Object getMessage() throws Throwable;
-
-    }
-
 }
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ObserverToClientCallListenerAdapter.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ObserverToClientCallListenerAdapter.java
index 40125cdb8d..1c934ce970 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ObserverToClientCallListenerAdapter.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ObserverToClientCallListenerAdapter.java
@@ -38,14 +38,10 @@ public class ObserverToClientCallListenerAdapter implements 
ClientCall.Listener
     }
 
     @Override
-    public void onMessage(ClientCall.MessageProducer messageProducer) {
-        try {
-            delegate.onNext(messageProducer.getMessage());
-            if (call.isAutoRequest()) {
-                call.request(1);
-            }
-        } catch (Throwable e) {
-            delegate.onError(e);
+    public void onMessage(Object message) {
+        delegate.onNext(message);
+        if (call.isAutoRequest()) {
+            call.request(1);
         }
     }
 
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/TripleClientCall.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/TripleClientCall.java
index f54cd4844a..f1d0a60f6a 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/TripleClientCall.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/TripleClientCall.java
@@ -77,23 +77,18 @@ public class TripleClientCall implements ClientCall, 
ClientStream.Listener {
             return;
         }
         try {
-            TripleMessageProducer messageProducer = 
TripleMessageProducer.withSupplier(() ->
-                    requestMetadata.packableMethod.parseResponse(message));
-            listener.onMessage(messageProducer);
+            final Object unpacked = 
requestMetadata.packableMethod.parseResponse(message);
+            listener.onMessage(unpacked);
         } catch (Throwable t) {
-            onDeserializeError(t);
+            TriRpcStatus status = 
TriRpcStatus.INTERNAL.withDescription("Deserialize response failed")
+                .withCause(t);
+            cancelByLocal(status.asException());
+            listener.onClose(status,null);
+            LOGGER.error(PROTOCOL_FAILED_RESPONSE, "", "", 
String.format("Failed to deserialize triple response, service=%s, 
method=%s,connection=%s",
+                    connectionClient, requestMetadata.service, 
requestMetadata.method.getMethodName()), t);
         }
     }
 
-    private void onDeserializeError(Throwable t){
-        TriRpcStatus status = 
TriRpcStatus.INTERNAL.withDescription("Deserialize response failed")
-            .withCause(t);
-        cancelByLocal(status.asException());
-        listener.onClose(status,null);
-        LOGGER.error(PROTOCOL_FAILED_RESPONSE, "", "", String.format("Failed 
to deserialize triple response, service=%s, method=%s,connection=%s",
-            connectionClient, requestMetadata.service, 
requestMetadata.method.getMethodName()), t);
-    }
-
     @Override
     public void onCancelByRemote(TriRpcStatus status) {
         if (canceled) {
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/TripleMessageProducer.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/TripleMessageProducer.java
deleted file mode 100644
index e40f03cce9..0000000000
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/TripleMessageProducer.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dubbo.rpc.protocol.tri.call;
-
-import org.apache.dubbo.common.function.ThrowableSupplier;
-
-
-class TripleMessageProducer implements ClientCall.MessageProducer {
-
-    private final ThrowableSupplier<Object> throwableSupplier;
-
-    private TripleMessageProducer(ThrowableSupplier<Object> throwableSupplier) 
{
-        this.throwableSupplier = throwableSupplier;
-    }
-
-    @Override
-    public Object getMessage() throws Throwable {
-        return throwableSupplier.get();
-    }
-
-    public static TripleMessageProducer withSupplier(ThrowableSupplier<Object> 
supplier) {
-        return new TripleMessageProducer(supplier);
-    }
-}
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/UnaryClientCallListener.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/UnaryClientCallListener.java
index 2829089fa0..2f00c15ac1 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/UnaryClientCallListener.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/UnaryClientCallListener.java
@@ -26,38 +26,31 @@ import java.util.Map;
 public class UnaryClientCallListener implements ClientCall.Listener {
 
     private final DeadlineFuture future;
-    private ClientCall.MessageProducer messageProducer;
+    private Object appResponse;
 
     public UnaryClientCallListener(DeadlineFuture deadlineFuture) {
         this.future = deadlineFuture;
     }
 
     @Override
-    public void onMessage(ClientCall.MessageProducer messageProducer) {
-        this.messageProducer = messageProducer;
+    public void onMessage(Object message) {
+        this.appResponse = message;
     }
 
     @Override
     public void onClose(TriRpcStatus status, Map<String, Object> trailers) {
-        future.received(status, () -> {
-            AppResponse result = new AppResponse();
-            result.setObjectAttachments(trailers);
-            if (status.isOk()) {
-                try {
-                    Object appResponse = messageProducer.getMessage();
-                    if (appResponse instanceof Exception) {
-                        result.setException((Exception) appResponse);
-                    } else {
-                        result.setValue(appResponse);
-                    }
-                } catch (Throwable e) {
-                    result.setException(e);
-                }
+        AppResponse result = new AppResponse();
+        result.setObjectAttachments(trailers);
+        if (status.isOk()) {
+            if (appResponse instanceof Exception) {
+                result.setException((Exception) appResponse);
             } else {
-                result.setException(status.asException());
+                result.setValue(appResponse);
             }
-            return result;
-        });
+         } else {
+            result.setException(status.asException());
+        }
+        future.received(status, result);
     }
 
     @Override
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/DeadlineFutureTest.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/DeadlineFutureTest.java
index 615e25938b..09d03513eb 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/DeadlineFutureTest.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/DeadlineFutureTest.java
@@ -45,7 +45,7 @@ class DeadlineFutureTest {
         DeadlineFuture success = DeadlineFuture.newFuture(service, method, 
address, 1000,
             ImmediateEventExecutor.INSTANCE);
         AppResponse response = new AppResponse();
-        success.received(TriRpcStatus.OK, () -> response);
+        success.received(TriRpcStatus.OK, response);
         AppResponse response1 = success.get();
         Assertions.assertEquals(response, response1);
     }

Reply via email to