This is an automated email from the ASF dual-hosted git repository.
earthchen pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.2 by this push:
new 7b9e65ed06 Revert "optimize performance. decode in user thread
(#11879)" (#11917)
7b9e65ed06 is described below
commit 7b9e65ed064d2df2deb1ff510470d9151a0f200b
Author: earthchen <[email protected]>
AuthorDate: Fri Mar 24 23:35:13 2023 +0800
Revert "optimize performance. decode in user thread (#11879)" (#11917)
This reverts commit 95865b04616c7d8b4d6b20ac4232b9499006d7eb.
---
.../dubbo/common/function/ThrowableSupplier.java | 27 ---------------
.../apache/dubbo/common/utils/ExecutorUtil.java | 6 ----
.../dubbo/rpc/protocol/tri/DeadlineFuture.java | 11 +++---
.../dubbo/rpc/protocol/tri/TripleInvoker.java | 3 --
.../dubbo/rpc/protocol/tri/call/ClientCall.java | 10 ++----
.../call/ObserverToClientCallListenerAdapter.java | 12 +++----
.../rpc/protocol/tri/call/TripleClientCall.java | 21 +++++-------
.../protocol/tri/call/TripleMessageProducer.java | 39 ----------------------
.../protocol/tri/call/UnaryClientCallListener.java | 33 ++++++++----------
.../dubbo/rpc/protocol/tri/DeadlineFutureTest.java | 2 +-
10 files changed, 32 insertions(+), 132 deletions(-)
diff --git
a/dubbo-common/src/main/java/org/apache/dubbo/common/function/ThrowableSupplier.java
b/dubbo-common/src/main/java/org/apache/dubbo/common/function/ThrowableSupplier.java
deleted file mode 100644
index 84befaf9f7..0000000000
---
a/dubbo-common/src/main/java/org/apache/dubbo/common/function/ThrowableSupplier.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dubbo.common.function;
-
-public interface ThrowableSupplier<T> {
-
- /**
- * Gets a result.
- *
- * @return a result
- */
- T get() throws Throwable;
-}
diff --git
a/dubbo-common/src/main/java/org/apache/dubbo/common/utils/ExecutorUtil.java
b/dubbo-common/src/main/java/org/apache/dubbo/common/utils/ExecutorUtil.java
index 714c8ae4fa..6c0bc148a1 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/utils/ExecutorUtil.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/utils/ExecutorUtil.java
@@ -37,8 +37,6 @@ public class ExecutorUtil {
new LinkedBlockingQueue<Runnable>(100),
new NamedThreadFactory("Close-ExecutorService-Timer", true));
- private static final Executor DIRECT_EXECUTOR = Runnable::run;
-
public static boolean isTerminated(Executor executor) {
if (executor instanceof ExecutorService) {
if (((ExecutorService) executor).isTerminated()) {
@@ -137,8 +135,4 @@ public class ExecutorUtil {
future.cancel(true);
}
}
-
- public static Executor directExecutor() {
- return DIRECT_EXECUTOR;
- }
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/DeadlineFuture.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/DeadlineFuture.java
index 34097752a8..f2985759fd 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/DeadlineFuture.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/DeadlineFuture.java
@@ -36,7 +36,6 @@ import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
-import java.util.function.Supplier;
public class DeadlineFuture extends CompletableFuture<AppResponse> {
@@ -81,7 +80,7 @@ public class DeadlineFuture extends
CompletableFuture<AppResponse> {
return future;
}
- public void received(TriRpcStatus status, Supplier<AppResponse>
appResponse) {
+ public void received(TriRpcStatus status, AppResponse appResponse) {
if (status.code != TriRpcStatus.Code.DEADLINE_EXCEEDED) {
// decrease Time
if (!timeoutTask.isCancelled()) {
@@ -89,13 +88,11 @@ public class DeadlineFuture extends
CompletableFuture<AppResponse> {
}
}
if (getExecutor() != null) {
- getExecutor().execute(() -> doReceived(status, appResponse.get()));
+ getExecutor().execute(() -> doReceived(status, appResponse));
} else {
- doReceived(status, appResponse.get());
+ doReceived(status, appResponse);
}
- }
-
- private static final GlobalResourceInitializer<Timer> TIME_OUT_TIMER = new
GlobalResourceInitializer<>(
+ } private static final GlobalResourceInitializer<Timer> TIME_OUT_TIMER
= new GlobalResourceInitializer<>(
() -> new HashedWheelTimer(new
NamedThreadFactory("dubbo-future-timeout", true), 30,
TimeUnit.MILLISECONDS), DeadlineFuture::destroy);
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java
index d3e7ec4f48..6c11a1061a 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java
@@ -24,7 +24,6 @@ import org.apache.dubbo.common.constants.CommonConstants;
import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.stream.StreamObserver;
-import org.apache.dubbo.common.utils.ExecutorUtil;
import org.apache.dubbo.remoting.api.connection.AbstractConnectionClient;
import org.apache.dubbo.rpc.AppResponse;
import org.apache.dubbo.rpc.AsyncRpcResult;
@@ -133,8 +132,6 @@ public class TripleInvoker<T> extends AbstractInvoker<T> {
try {
switch (methodDescriptor.getRpcType()) {
case UNARY:
- call = new TripleClientCall(connectionClient,
ExecutorUtil.directExecutor(),
- getUrl().getOrDefaultFrameworkModel(), writeQueue);
result = invokeUnary(methodDescriptor, invocation, call);
break;
case SERVER_STREAM:
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ClientCall.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ClientCall.java
index a54e7b8df1..4936f54409 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ClientCall.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ClientCall.java
@@ -43,9 +43,9 @@ public interface ClientCall {
/**
* Callback when message received.
*
- * @param messageProducer message producer
+ * @param message message received
*/
- void onMessage(MessageProducer messageProducer);
+ void onMessage(Object message);
/**
* Callback when call is finished.
@@ -110,10 +110,4 @@ public interface ClientCall {
*/
void setCompression(String compression);
- interface MessageProducer {
-
- Object getMessage() throws Throwable;
-
- }
-
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ObserverToClientCallListenerAdapter.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ObserverToClientCallListenerAdapter.java
index 40125cdb8d..1c934ce970 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ObserverToClientCallListenerAdapter.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/ObserverToClientCallListenerAdapter.java
@@ -38,14 +38,10 @@ public class ObserverToClientCallListenerAdapter implements
ClientCall.Listener
}
@Override
- public void onMessage(ClientCall.MessageProducer messageProducer) {
- try {
- delegate.onNext(messageProducer.getMessage());
- if (call.isAutoRequest()) {
- call.request(1);
- }
- } catch (Throwable e) {
- delegate.onError(e);
+ public void onMessage(Object message) {
+ delegate.onNext(message);
+ if (call.isAutoRequest()) {
+ call.request(1);
}
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/TripleClientCall.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/TripleClientCall.java
index f54cd4844a..f1d0a60f6a 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/TripleClientCall.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/TripleClientCall.java
@@ -77,23 +77,18 @@ public class TripleClientCall implements ClientCall,
ClientStream.Listener {
return;
}
try {
- TripleMessageProducer messageProducer =
TripleMessageProducer.withSupplier(() ->
- requestMetadata.packableMethod.parseResponse(message));
- listener.onMessage(messageProducer);
+ final Object unpacked =
requestMetadata.packableMethod.parseResponse(message);
+ listener.onMessage(unpacked);
} catch (Throwable t) {
- onDeserializeError(t);
+ TriRpcStatus status =
TriRpcStatus.INTERNAL.withDescription("Deserialize response failed")
+ .withCause(t);
+ cancelByLocal(status.asException());
+ listener.onClose(status,null);
+ LOGGER.error(PROTOCOL_FAILED_RESPONSE, "", "",
String.format("Failed to deserialize triple response, service=%s,
method=%s,connection=%s",
+ connectionClient, requestMetadata.service,
requestMetadata.method.getMethodName()), t);
}
}
- private void onDeserializeError(Throwable t){
- TriRpcStatus status =
TriRpcStatus.INTERNAL.withDescription("Deserialize response failed")
- .withCause(t);
- cancelByLocal(status.asException());
- listener.onClose(status,null);
- LOGGER.error(PROTOCOL_FAILED_RESPONSE, "", "", String.format("Failed
to deserialize triple response, service=%s, method=%s,connection=%s",
- connectionClient, requestMetadata.service,
requestMetadata.method.getMethodName()), t);
- }
-
@Override
public void onCancelByRemote(TriRpcStatus status) {
if (canceled) {
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/TripleMessageProducer.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/TripleMessageProducer.java
deleted file mode 100644
index e40f03cce9..0000000000
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/TripleMessageProducer.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dubbo.rpc.protocol.tri.call;
-
-import org.apache.dubbo.common.function.ThrowableSupplier;
-
-
-class TripleMessageProducer implements ClientCall.MessageProducer {
-
- private final ThrowableSupplier<Object> throwableSupplier;
-
- private TripleMessageProducer(ThrowableSupplier<Object> throwableSupplier)
{
- this.throwableSupplier = throwableSupplier;
- }
-
- @Override
- public Object getMessage() throws Throwable {
- return throwableSupplier.get();
- }
-
- public static TripleMessageProducer withSupplier(ThrowableSupplier<Object>
supplier) {
- return new TripleMessageProducer(supplier);
- }
-}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/UnaryClientCallListener.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/UnaryClientCallListener.java
index 2829089fa0..2f00c15ac1 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/UnaryClientCallListener.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/call/UnaryClientCallListener.java
@@ -26,38 +26,31 @@ import java.util.Map;
public class UnaryClientCallListener implements ClientCall.Listener {
private final DeadlineFuture future;
- private ClientCall.MessageProducer messageProducer;
+ private Object appResponse;
public UnaryClientCallListener(DeadlineFuture deadlineFuture) {
this.future = deadlineFuture;
}
@Override
- public void onMessage(ClientCall.MessageProducer messageProducer) {
- this.messageProducer = messageProducer;
+ public void onMessage(Object message) {
+ this.appResponse = message;
}
@Override
public void onClose(TriRpcStatus status, Map<String, Object> trailers) {
- future.received(status, () -> {
- AppResponse result = new AppResponse();
- result.setObjectAttachments(trailers);
- if (status.isOk()) {
- try {
- Object appResponse = messageProducer.getMessage();
- if (appResponse instanceof Exception) {
- result.setException((Exception) appResponse);
- } else {
- result.setValue(appResponse);
- }
- } catch (Throwable e) {
- result.setException(e);
- }
+ AppResponse result = new AppResponse();
+ result.setObjectAttachments(trailers);
+ if (status.isOk()) {
+ if (appResponse instanceof Exception) {
+ result.setException((Exception) appResponse);
} else {
- result.setException(status.asException());
+ result.setValue(appResponse);
}
- return result;
- });
+ } else {
+ result.setException(status.asException());
+ }
+ future.received(status, result);
}
@Override
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/DeadlineFutureTest.java
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/DeadlineFutureTest.java
index 615e25938b..09d03513eb 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/DeadlineFutureTest.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/DeadlineFutureTest.java
@@ -45,7 +45,7 @@ class DeadlineFutureTest {
DeadlineFuture success = DeadlineFuture.newFuture(service, method,
address, 1000,
ImmediateEventExecutor.INSTANCE);
AppResponse response = new AppResponse();
- success.received(TriRpcStatus.OK, () -> response);
+ success.received(TriRpcStatus.OK, response);
AppResponse response1 = success.get();
Assertions.assertEquals(response, response1);
}