This is an automated email from the ASF dual-hosted git repository. earthchen pushed a commit to branch revert-11879-user_decode in repository https://gitbox.apache.org/repos/asf/dubbo.git
commit 2115c69c12a075629d7669591387369784f6caa2 Author: earthchen <[email protected]> AuthorDate: Fri Mar 24 23:33:35 2023 +0800 Revert "optimize performance. decode in user thread (#11879)" This reverts commit 95865b04616c7d8b4d6b20ac4232b9499006d7eb. --- .../dubbo/common/function/ThrowableSupplier.java | 27 --------------- .../apache/dubbo/common/utils/ExecutorUtil.java | 6 ---- .../dubbo/rpc/protocol/tri/DeadlineFuture.java | 11 +++--- .../dubbo/rpc/protocol/tri/TripleInvoker.java | 3 -- .../dubbo/rpc/protocol/tri/call/ClientCall.java | 10 ++---- .../call/ObserverToClientCallListenerAdapter.java | 12 +++---- .../rpc/protocol/tri/call/TripleClientCall.java | 21 +++++------- .../protocol/tri/call/TripleMessageProducer.java | 39 ---------------------- .../protocol/tri/call/UnaryClientCallListener.java | 33 ++++++++---------- .../dubbo/rpc/protocol/tri/DeadlineFutureTest.java | 2 +- 10 files changed, 32 insertions(+), 132 deletions(-) diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/function/ThrowableSupplier.java b/dubbo-common/src/main/java/org/apache/dubbo/common/function/ThrowableSupplier.java deleted file mode 100644 index 84befaf9f7..0000000000 --- a/dubbo-common/src/main/java/org/apache/dubbo/common/function/ThrowableSupplier.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.dubbo.common.function; - -public interface ThrowableSupplier<T> { - - /** - * Gets a result. - * - * @return a result - */ - T get() throws Throwable; -} diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/utils/ExecutorUtil.java b/dubbo-common/src/main/java/org/apache/dubbo/common/utils/ExecutorUtil.java index 714c8ae4fa..6c0bc148a1 100644 --- a/dubbo-common/src/main/java/org/apache/dubbo/common/utils/ExecutorUtil.java +++ b/dubbo-common/src/main/java/org/apache/dubbo/common/utils/ExecutorUtil.java @@ -37,8 +37,6 @@ public class ExecutorUtil { new LinkedBlockingQueue<Runnable>(100), new NamedThreadFactory("Close-ExecutorService-Timer", true)); - private static final Executor DIRECT_EXECUTOR = Runnable::run; - public static boolean isTerminated(Executor executor) { if (executor instanceof ExecutorService) { if (((ExecutorService) executor).isTerminated()) { @@ -137,8 +135,4 @@ public class ExecutorUtil { future.cancel(true); } } - - public static Executor directExecutor() { - return DIRECT_EXECUTOR; - } } diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/DeadlineFuture.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/DeadlineFuture.java index 34097752a8..f2985759fd 100644 --- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/DeadlineFuture.java +++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/DeadlineFuture.java @@ -36,7 +36,6 @@ import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; -import java.util.function.Supplier; public class DeadlineFuture extends CompletableFuture<AppResponse> { @@ -81,7 +80,7 @@ public class DeadlineFuture extends CompletableFuture<AppResponse> { return future; } - public void received(TriRpcStatus status, Supplier<AppResponse> appResponse) { + public void received(TriRpcStatus status, AppResponse appResponse) { if (status.code != TriRpcStatus.Code.DEADLINE_EXCEEDED) { // decrease Time if (!timeoutTask.isCancelled()) { @@ -89,13 +88,11 @@ public class DeadlineFuture extends CompletableFuture<AppResponse> { } } if (getExecutor() != null) { - getExecutor().execute(() -> doReceived(status, appResponse.get())); + getExecutor().execute(() -> doReceived(status, appResponse)); } else { - doReceived(status, appResponse.get()); + doReceived(status, appResponse); } - } - - private static final GlobalResourceInitializer<Timer> TIME_OUT_TIMER = new GlobalResourceInitializer<>( + } private static final GlobalResourceInitializer<Timer> TIME_OUT_TIMER = new GlobalResourceInitializer<>( () -> new HashedWheelTimer(new NamedThreadFactory("dubbo-future-timeout", true), 30, TimeUnit.MILLISECONDS), DeadlineFuture::destroy); diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java index d3e7ec4f48..6c11a1061a 100644 --- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java +++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java @@ -24,7 +24,6 @@ import org.apache.dubbo.common.constants.CommonConstants; import org.apache.dubbo.common.logger.ErrorTypeAwareLogger; import org.apache.dubbo.common.logger.LoggerFactory; import org.apache.dubbo.common.stream.StreamObserver; -import org.apache.dubbo.common.utils.ExecutorUtil; import org.apache.dubbo.remoting.api.connection.AbstractConnectionClient; import org.apache.dubbo.rpc.AppResponse; import org.apache.dubbo.rpc.AsyncRpcResult; @@ -133,8 +132,6 @@ public class TripleInvoker<T> extends AbstractInvoker<T> { try { switch (methodDescriptor.getRpcType()) { case UNARY: - call = new TripleClientCall(connectionClient, ExecutorUtil.directExecutor(), - getUrl().getOrDefaultFrameworkModel(), writeQueue); result = invokeUnary(methodDescriptor, invocation, call); break; case SERVER_STREAM: diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ClientCall.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ClientCall.java index a54e7b8df1..4936f54409 100644 --- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ClientCall.java +++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ClientCall.java @@ -43,9 +43,9 @@ public interface ClientCall { /** * Callback when message received. * - * @param messageProducer message producer + * @param message message received */ - void onMessage(MessageProducer messageProducer); + void onMessage(Object message); /** * Callback when call is finished. @@ -110,10 +110,4 @@ public interface ClientCall { */ void setCompression(String compression); - interface MessageProducer { - - Object getMessage() throws Throwable; - - } - } diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ObserverToClientCallListenerAdapter.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ObserverToClientCallListenerAdapter.java index 40125cdb8d..1c934ce970 100644 --- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ObserverToClientCallListenerAdapter.java +++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ObserverToClientCallListenerAdapter.java @@ -38,14 +38,10 @@ public class ObserverToClientCallListenerAdapter implements ClientCall.Listener } @Override - public void onMessage(ClientCall.MessageProducer messageProducer) { - try { - delegate.onNext(messageProducer.getMessage()); - if (call.isAutoRequest()) { - call.request(1); - } - } catch (Throwable e) { - delegate.onError(e); + public void onMessage(Object message) { + delegate.onNext(message); + if (call.isAutoRequest()) { + call.request(1); } } diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/TripleClientCall.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/TripleClientCall.java index f54cd4844a..f1d0a60f6a 100644 --- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/TripleClientCall.java +++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/TripleClientCall.java @@ -77,23 +77,18 @@ public class TripleClientCall implements ClientCall, ClientStream.Listener { return; } try { - TripleMessageProducer messageProducer = TripleMessageProducer.withSupplier(() -> - requestMetadata.packableMethod.parseResponse(message)); - listener.onMessage(messageProducer); + final Object unpacked = requestMetadata.packableMethod.parseResponse(message); + listener.onMessage(unpacked); } catch (Throwable t) { - onDeserializeError(t); + TriRpcStatus status = TriRpcStatus.INTERNAL.withDescription("Deserialize response failed") + .withCause(t); + cancelByLocal(status.asException()); + listener.onClose(status,null); + LOGGER.error(PROTOCOL_FAILED_RESPONSE, "", "", String.format("Failed to deserialize triple response, service=%s, method=%s,connection=%s", + connectionClient, requestMetadata.service, requestMetadata.method.getMethodName()), t); } } - private void onDeserializeError(Throwable t){ - TriRpcStatus status = TriRpcStatus.INTERNAL.withDescription("Deserialize response failed") - .withCause(t); - cancelByLocal(status.asException()); - listener.onClose(status,null); - LOGGER.error(PROTOCOL_FAILED_RESPONSE, "", "", String.format("Failed to deserialize triple response, service=%s, method=%s,connection=%s", - connectionClient, requestMetadata.service, requestMetadata.method.getMethodName()), t); - } - @Override public void onCancelByRemote(TriRpcStatus status) { if (canceled) { diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/TripleMessageProducer.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/TripleMessageProducer.java deleted file mode 100644 index e40f03cce9..0000000000 --- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/TripleMessageProducer.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dubbo.rpc.protocol.tri.call; - -import org.apache.dubbo.common.function.ThrowableSupplier; - - -class TripleMessageProducer implements ClientCall.MessageProducer { - - private final ThrowableSupplier<Object> throwableSupplier; - - private TripleMessageProducer(ThrowableSupplier<Object> throwableSupplier) { - this.throwableSupplier = throwableSupplier; - } - - @Override - public Object getMessage() throws Throwable { - return throwableSupplier.get(); - } - - public static TripleMessageProducer withSupplier(ThrowableSupplier<Object> supplier) { - return new TripleMessageProducer(supplier); - } -} diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/UnaryClientCallListener.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/UnaryClientCallListener.java index 2829089fa0..2f00c15ac1 100644 --- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/UnaryClientCallListener.java +++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/UnaryClientCallListener.java @@ -26,38 +26,31 @@ import java.util.Map; public class UnaryClientCallListener implements ClientCall.Listener { private final DeadlineFuture future; - private ClientCall.MessageProducer messageProducer; + private Object appResponse; public UnaryClientCallListener(DeadlineFuture deadlineFuture) { this.future = deadlineFuture; } @Override - public void onMessage(ClientCall.MessageProducer messageProducer) { - this.messageProducer = messageProducer; + public void onMessage(Object message) { + this.appResponse = message; } @Override public void onClose(TriRpcStatus status, Map<String, Object> trailers) { - future.received(status, () -> { - AppResponse result = new AppResponse(); - result.setObjectAttachments(trailers); - if (status.isOk()) { - try { - Object appResponse = messageProducer.getMessage(); - if (appResponse instanceof Exception) { - result.setException((Exception) appResponse); - } else { - result.setValue(appResponse); - } - } catch (Throwable e) { - result.setException(e); - } + AppResponse result = new AppResponse(); + result.setObjectAttachments(trailers); + if (status.isOk()) { + if (appResponse instanceof Exception) { + result.setException((Exception) appResponse); } else { - result.setException(status.asException()); + result.setValue(appResponse); } - return result; - }); + } else { + result.setException(status.asException()); + } + future.received(status, result); } @Override diff --git a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/DeadlineFutureTest.java b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/DeadlineFutureTest.java index 615e25938b..09d03513eb 100644 --- a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/DeadlineFutureTest.java +++ b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/DeadlineFutureTest.java @@ -45,7 +45,7 @@ class DeadlineFutureTest { DeadlineFuture success = DeadlineFuture.newFuture(service, method, address, 1000, ImmediateEventExecutor.INSTANCE); AppResponse response = new AppResponse(); - success.received(TriRpcStatus.OK, () -> response); + success.received(TriRpcStatus.OK, response); AppResponse response1 = success.get(); Assertions.assertEquals(response, response1); }
