This is an automated email from the ASF dual-hosted git repository. liujun pushed a commit to branch performance-tuning-2.7.x in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/performance-tuning-2.7.x by this push: new b2dbc5e DefaultFuture (#4257) b2dbc5e is described below commit b2dbc5ef5da92e3ee5eba06d64a8e00446dd65c7 Author: ken.lj <ken.lj...@gmail.com> AuthorDate: Wed Jun 5 18:41:40 2019 +0800 DefaultFuture (#4257) --- .../dubbo/demo/provider/DemoServiceImpl.java | 10 ------ .../apache/dubbo/registry/dubbo/MockChannel.java | 8 ++--- .../apache/dubbo/registry/dubbo/MockedClient.java | 14 ++++----- .../dubbo/remoting/exchange/ExchangeChannel.java | 8 ++--- .../remoting/exchange/support/DefaultFuture.java | 36 +++++++++++++--------- .../support/header/HeaderExchangeChannel.java | 18 +++++------ .../support/header/HeaderExchangeClient.java | 20 ++++++------ .../test/java/org/apache/dubbo/remoting/Main.java | 10 +++--- .../dubbo/remoting/PerformanceClientFixedTest.java | 5 ++- .../dubbo/remoting/PerformanceClientTest.java | 9 ++++-- .../exchange/support/DefaultFutureTest.java | 9 ++++-- .../support/header/HeaderExchangeChannelTest.java | 9 +++--- .../transport/mina/ClientToServerTest.java | 2 +- .../transport/netty/ClientToServerTest.java | 2 +- .../transport/netty4/ClientToServerTest.java | 2 +- .../rpc/protocol/dubbo/ChannelWrappedInvoker.java | 11 +------ .../dubbo/rpc/protocol/dubbo/DubboInvoker.java | 4 +-- .../protocol/dubbo/LazyConnectExchangeClient.java | 18 +++++------ .../dubbo/ReferenceCountExchangeClient.java | 16 +++++----- .../dubbo/rpc/protocol/thrift/ThriftInvoker.java | 4 +-- .../dubbo/rpc/protocol/thrift/ThriftCodecTest.java | 5 +-- 21 files changed, 108 insertions(+), 112 deletions(-) diff --git a/dubbo-demo/dubbo-demo-xml/dubbo-demo-xml-provider/src/main/java/org/apache/dubbo/demo/provider/DemoServiceImpl.java b/dubbo-demo/dubbo-demo-xml/dubbo-demo-xml-provider/src/main/java/org/apache/dubbo/demo/provider/DemoServiceImpl.java index 5e57f77..bceeae2 100644 --- a/dubbo-demo/dubbo-demo-xml/dubbo-demo-xml-provider/src/main/java/org/apache/dubbo/demo/provider/DemoServiceImpl.java +++ b/dubbo-demo/dubbo-demo-xml/dubbo-demo-xml-provider/src/main/java/org/apache/dubbo/demo/provider/DemoServiceImpl.java @@ -30,22 +30,12 @@ public class DemoServiceImpl implements DemoService { @Override public String sayHello(String name) { logger.info("Hello " + name + ", request from consumer: " + RpcContext.getContext().getRemoteAddress()); - try { - Thread.sleep(10000); - } catch (InterruptedException e) { - e.printStackTrace(); - } return "Hello " + name + ", response from provider: " + RpcContext.getContext().getLocalAddress(); } @Override public CompletableFuture<String> sayHelloAsync(String name) { CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> { - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - e.printStackTrace(); - } return "async result"; }); return cf; diff --git a/dubbo-registry/dubbo-registry-default/src/test/java/org/apache/dubbo/registry/dubbo/MockChannel.java b/dubbo-registry/dubbo-registry-default/src/test/java/org/apache/dubbo/registry/dubbo/MockChannel.java index e563da6..9c43359 100644 --- a/dubbo-registry/dubbo-registry-default/src/test/java/org/apache/dubbo/registry/dubbo/MockChannel.java +++ b/dubbo-registry/dubbo-registry-default/src/test/java/org/apache/dubbo/registry/dubbo/MockChannel.java @@ -86,21 +86,21 @@ public class MockChannel implements ExchangeChannel { return null; } - public CompletableFuture<Object> request(Object request) throws RemotingException { + public CompletableFuture<Object> request(Object request, CompletableFuture completableFuture) throws RemotingException { return null; } - public CompletableFuture<Object> request(Object request, int timeout) throws RemotingException { + public CompletableFuture<Object> request(Object request, int timeout, CompletableFuture completableFuture) throws RemotingException { return null; } @Override - public CompletableFuture<Object> request(Object request, ExecutorService executor) throws RemotingException { + public CompletableFuture<Object> request(Object request, ExecutorService executor, CompletableFuture completableFuture) throws RemotingException { return null; } @Override - public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException { + public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor, CompletableFuture completableFuture) throws RemotingException { return null; } diff --git a/dubbo-registry/dubbo-registry-default/src/test/java/org/apache/dubbo/registry/dubbo/MockedClient.java b/dubbo-registry/dubbo-registry-default/src/test/java/org/apache/dubbo/registry/dubbo/MockedClient.java index ad06cd2..bd62d3a 100644 --- a/dubbo-registry/dubbo-registry-default/src/test/java/org/apache/dubbo/registry/dubbo/MockedClient.java +++ b/dubbo-registry/dubbo-registry-default/src/test/java/org/apache/dubbo/registry/dubbo/MockedClient.java @@ -81,21 +81,21 @@ public class MockedClient implements ExchangeClient { this.sent = msg; } - public CompletableFuture<Object> request(Object msg) throws RemotingException { - return request(msg, null); + public CompletableFuture<Object> request(Object msg, CompletableFuture completableFuture) throws RemotingException { + return request(msg, null, completableFuture); } - public CompletableFuture<Object> request(Object msg, int timeout) throws RemotingException { - return this.request(msg, timeout, null); + public CompletableFuture<Object> request(Object msg, int timeout, CompletableFuture completableFuture) throws RemotingException { + return this.request(msg, timeout, null, completableFuture); } @Override - public CompletableFuture<Object> request(Object msg, ExecutorService executor) throws RemotingException { - return this.request(msg, 0, executor); + public CompletableFuture<Object> request(Object msg, ExecutorService executor, CompletableFuture completableFuture) throws RemotingException { + return this.request(msg, 0, executor, completableFuture); } @Override - public CompletableFuture<Object> request(Object msg, int timeout, ExecutorService executor) throws RemotingException { + public CompletableFuture<Object> request(Object msg, int timeout, ExecutorService executor, CompletableFuture completableFuture) throws RemotingException { this.invoked = msg; return new CompletableFuture<Object>() { public Object get() throws InterruptedException, ExecutionException { diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/ExchangeChannel.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/ExchangeChannel.java index c0cf131..48a9f6f 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/ExchangeChannel.java +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/ExchangeChannel.java @@ -35,7 +35,7 @@ public interface ExchangeChannel extends Channel { * @throws RemotingException */ @Deprecated - CompletableFuture<Object> request(Object request) throws RemotingException; + CompletableFuture<Object> request(Object request, CompletableFuture completableFuture) throws RemotingException; /** * send request. @@ -46,7 +46,7 @@ public interface ExchangeChannel extends Channel { * @throws RemotingException */ @Deprecated - CompletableFuture<Object> request(Object request, int timeout) throws RemotingException; + CompletableFuture<Object> request(Object request, int timeout, CompletableFuture completableFuture) throws RemotingException; /** * send request. @@ -55,7 +55,7 @@ public interface ExchangeChannel extends Channel { * @return response future * @throws RemotingException */ - CompletableFuture<Object> request(Object request, ExecutorService executor) throws RemotingException; + CompletableFuture<Object> request(Object request, ExecutorService executor, CompletableFuture completableFuture) throws RemotingException; /** * send request. @@ -65,7 +65,7 @@ public interface ExchangeChannel extends Channel { * @return response future * @throws RemotingException */ - CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException; + CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor, CompletableFuture completableFuture) throws RemotingException; /** * get message handler. diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java index 0abecf2..a23e8cd 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java @@ -35,6 +35,7 @@ import java.util.Date; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; @@ -44,7 +45,7 @@ import static org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_KEY; /** * DefaultFuture. */ -public class DefaultFuture extends CompletableFuture<Object> { +public class DefaultFuture { private static final Logger logger = LoggerFactory.getLogger(DefaultFuture.class); @@ -52,6 +53,9 @@ public class DefaultFuture extends CompletableFuture<Object> { private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<>(); + // private static final Map<Long, Timeout> PENDING_TASKS = new ConcurrentHashMap<>(); + private final CompletableFuture completableFuture; + public static final Timer TIME_OUT_TIMER = new HashedWheelTimer( new NamedThreadFactory("dubbo-future-timeout", true), 30, @@ -66,20 +70,18 @@ public class DefaultFuture extends CompletableFuture<Object> { private volatile long sent; private Timeout timeoutCheckTask; - private ExecutorService executor; + private final ExecutorService executor; public ExecutorService getExecutor() { return executor; } - public void setExecutor(ExecutorService executor) { - this.executor = executor; - } - - private DefaultFuture(Channel channel, Request request, int timeout) { + private DefaultFuture(Channel channel, Request request, int timeout, ExecutorService executor, CompletableFuture completableFuture) { this.channel = channel; this.request = request; this.id = request.getId(); + this.executor = executor; + this.completableFuture = completableFuture; this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT); // put into waiting map. FUTURES.put(id, this); @@ -104,9 +106,8 @@ public class DefaultFuture extends CompletableFuture<Object> { * @param timeout timeout * @return a new DefaultFuture */ - public static DefaultFuture newFuture(Channel channel, Request request, int timeout, ExecutorService executor) { - final DefaultFuture future = new DefaultFuture(channel, request, timeout); - future.setExecutor(executor); + public static DefaultFuture newFuture(Channel channel, Request request, int timeout, ExecutorService executor, CompletableFuture completableFuture) { + final DefaultFuture future = new DefaultFuture(channel, request, timeout, executor, completableFuture); // timeout check timeoutCheck(future); return future; @@ -176,7 +177,6 @@ public class DefaultFuture extends CompletableFuture<Object> { } } - @Override public boolean cancel(boolean mayInterruptIfRunning) { Response errorResult = new Response(id); errorResult.setStatus(Response.CLIENT_ERROR); @@ -187,6 +187,14 @@ public class DefaultFuture extends CompletableFuture<Object> { return true; } + public boolean isDone() { + return this.completableFuture.isDone(); + } + + public Object get() throws ExecutionException, InterruptedException { + return this.completableFuture.get(); + } + public void cancel() { this.cancel(true); } @@ -196,11 +204,11 @@ public class DefaultFuture extends CompletableFuture<Object> { throw new IllegalStateException("response cannot be null"); } if (res.getStatus() == Response.OK) { - this.complete(res.getResult()); + this.completableFuture.complete(res.getResult()); } else if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) { - this.completeExceptionally(new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage())); + this.completableFuture.completeExceptionally(new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage())); } else { - this.completeExceptionally(new RemotingException(channel, res.getErrorMessage())); + this.completableFuture.completeExceptionally(new RemotingException(channel, res.getErrorMessage())); } // the result is returning, but the caller thread may still waiting diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeChannel.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeChannel.java index b954612..a82bf53 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeChannel.java +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeChannel.java @@ -106,22 +106,22 @@ final class HeaderExchangeChannel implements ExchangeChannel { } @Override - public CompletableFuture<Object> request(Object request) throws RemotingException { - return request(request, null); + public CompletableFuture<Object> request(Object request, CompletableFuture completableFuture) throws RemotingException { + return request(request, null, completableFuture); } @Override - public CompletableFuture<Object> request(Object request, int timeout) throws RemotingException { - return request(request, timeout, null); + public CompletableFuture<Object> request(Object request, int timeout, CompletableFuture completableFuture) throws RemotingException { + return request(request, timeout, null, completableFuture); } @Override - public CompletableFuture<Object> request(Object request, ExecutorService executor) throws RemotingException { - return request(request, channel.getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT), executor); + public CompletableFuture<Object> request(Object request, ExecutorService executor, CompletableFuture completableFuture) throws RemotingException { + return request(request, channel.getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT), executor, completableFuture); } @Override - public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException { + public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor, CompletableFuture completableFuture) throws RemotingException { if (closed) { throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!"); } @@ -130,14 +130,14 @@ final class HeaderExchangeChannel implements ExchangeChannel { req.setVersion(Version.getProtocolVersion()); req.setTwoWay(true); req.setData(request); - DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout, executor); + DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout, executor, completableFuture); try { channel.send(req); } catch (RemotingException e) { future.cancel(); throw e; } - return future; + return completableFuture; } @Override diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeClient.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeClient.java index 6671d1a..8e04b99 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeClient.java +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeClient.java @@ -34,11 +34,11 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; -import static org.apache.dubbo.remoting.utils.UrlUtils.getHeartbeat; -import static org.apache.dubbo.remoting.utils.UrlUtils.getIdleTimeout; import static org.apache.dubbo.remoting.Constants.HEARTBEAT_CHECK_TICK; import static org.apache.dubbo.remoting.Constants.LEAST_HEARTBEAT_DURATION; import static org.apache.dubbo.remoting.Constants.TICKS_PER_WHEEL; +import static org.apache.dubbo.remoting.utils.UrlUtils.getHeartbeat; +import static org.apache.dubbo.remoting.utils.UrlUtils.getIdleTimeout; /** * DefaultMessageClient @@ -66,8 +66,8 @@ public class HeaderExchangeClient implements ExchangeClient { } @Override - public CompletableFuture<Object> request(Object request) throws RemotingException { - return channel.request(request); + public CompletableFuture<Object> request(Object request, CompletableFuture completableFuture) throws RemotingException { + return channel.request(request, completableFuture); } @Override @@ -81,18 +81,18 @@ public class HeaderExchangeClient implements ExchangeClient { } @Override - public CompletableFuture<Object> request(Object request, int timeout) throws RemotingException { - return channel.request(request, timeout); + public CompletableFuture<Object> request(Object request, int timeout, CompletableFuture completableFuture) throws RemotingException { + return channel.request(request, timeout, completableFuture); } @Override - public CompletableFuture<Object> request(Object request, ExecutorService executor) throws RemotingException { - return channel.request(request, executor); + public CompletableFuture<Object> request(Object request, ExecutorService executor, CompletableFuture completableFuture) throws RemotingException { + return channel.request(request, executor, completableFuture); } @Override - public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException { - return channel.request(request, timeout, executor); + public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor, CompletableFuture completableFuture) throws RemotingException { + return channel.request(request, timeout, executor, completableFuture); } @Override diff --git a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/Main.java b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/Main.java index 4478f86..9869bcc 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/Main.java +++ b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/Main.java @@ -62,7 +62,7 @@ public class Main { sb.append("(" + random.nextLong() + ")"); Main.Data d = new Main.Data(); d.setData(sb.toString()); - client.request(d).get(); + client.request(d, new CompletableFuture()).get(); } System.out.println("send finished."); } @@ -83,18 +83,18 @@ public class Main { private static void test(int port) throws Exception { ExchangeChannel client = Exchangers.connect(URL.valueOf("dubbo://localhost:" + port)); - MockResult result = (MockResult) client.request(new RpcMessage(DemoService.class.getName(), "plus", new Class<?>[]{int.class, int.class}, new Object[]{55, 25})).get(); + MockResult result = (MockResult) client.request(new RpcMessage(DemoService.class.getName(), "plus", new Class<?>[]{int.class, int.class}, new Object[]{55, 25}), new CompletableFuture()).get(); System.out.println("55+25=" + result.getResult()); for (int i = 0; i < 100; i++) - client.request(new RpcMessage(DemoService.class.getName(), "sayHello", new Class<?>[]{String.class}, new Object[]{"qianlei" + i})); + client.request(new RpcMessage(DemoService.class.getName(), "sayHello", new Class<?>[]{String.class}, new Object[]{"qianlei" + i}), new CompletableFuture()); for (int i = 0; i < 100; i++) - client.request(new Main.Data()); + client.request(new Main.Data(), new CompletableFuture()); System.out.println("=====test invoke====="); for (int i = 0; i < 100; i++) { - CompletableFuture<Object> future = client.request(new Main.Data()); + CompletableFuture<Object> future = client.request(new Main.Data(), new CompletableFuture()); System.out.println("invoke and get"); System.out.println("invoke result:" + future.get()); } diff --git a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/PerformanceClientFixedTest.java b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/PerformanceClientFixedTest.java index 93ccc76..ab0228f 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/PerformanceClientFixedTest.java +++ b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/PerformanceClientFixedTest.java @@ -25,6 +25,7 @@ import org.junit.jupiter.api.Test; import java.util.ArrayList; import java.util.Random; +import java.util.concurrent.CompletableFuture; import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_TIMEOUT; import static org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_KEY; @@ -120,7 +121,9 @@ public class PerformanceClientFixedTest { int index = rd.nextInt(connectionCount); ExchangeClient client = arrays.get(index); // ExchangeClient client = arrays.get(0); - String output = (String) client.request(messageBlock).get(); + CompletableFuture cf = new CompletableFuture(); + client.request(messageBlock, cf); + String output = (String) cf.get(); if (output.lastIndexOf(messageBlock) < 0) { System.out.println("send messageBlock;get " + output); diff --git a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/PerformanceClientTest.java b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/PerformanceClientTest.java index f4dfbc2..b86b17d 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/PerformanceClientTest.java +++ b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/PerformanceClientTest.java @@ -28,6 +28,7 @@ import java.text.DecimalFormat; import java.text.SimpleDateFormat; import java.util.Date; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -72,8 +73,9 @@ public class PerformanceClientTest { exchangeClients[i] = Exchangers.connect(url); } - List<String> serverEnvironment = (List<String>) exchangeClients[0].request("environment").get(); - List<String> serverScene = (List<String>) exchangeClients[0].request("scene").get(); + CompletableFuture<Object> completableFuture = new CompletableFuture<>(); + List<String> serverEnvironment = (List<String>) exchangeClients[0].request("environment", completableFuture).get(); + List<String> serverScene = (List<String>) exchangeClients[0].request("scene", completableFuture).get(); // Create some data for test StringBuilder buf = new StringBuilder(length); @@ -101,7 +103,8 @@ public class PerformanceClientTest { count.incrementAndGet(); ExchangeClient client = exchangeClients[index.getAndIncrement() % connections]; long start = System.currentTimeMillis(); - String result = (String) client.request(data).get(); + CompletableFuture<Object> completableFuture = new CompletableFuture<>(); + String result = (String) client.request(data, completableFuture).get(); long end = System.currentTimeMillis(); if (!data.equals(result)) { throw new IllegalStateException("Invalid result " + result); diff --git a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/DefaultFutureTest.java b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/DefaultFutureTest.java index 0f19d15..01db770 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/DefaultFutureTest.java +++ b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/DefaultFutureTest.java @@ -27,6 +27,7 @@ import org.junit.jupiter.api.Test; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicInteger; public class DefaultFutureTest { @@ -95,7 +96,8 @@ public class DefaultFutureTest { // timeout after 5 seconds. Channel channel = new MockedChannel(); Request request = new Request(10); - DefaultFuture f = DefaultFuture.newFuture(channel, request, 5000, null); + CompletableFuture cf = new CompletableFuture(); + DefaultFuture f = DefaultFuture.newFuture(channel, request, 5000, null, cf); //mark the future is sent DefaultFuture.sent(channel, request); while (!f.isDone()) { @@ -106,7 +108,7 @@ public class DefaultFutureTest { // get operate will throw a timeout exception, because the future is timeout. try { - f.get(); + cf.get(); } catch (Exception e) { Assertions.assertTrue(e.getCause() instanceof TimeoutException, "catch exception is not timeout exception!"); System.out.println(e.getMessage()); @@ -119,7 +121,8 @@ public class DefaultFutureTest { private DefaultFuture defaultFuture(int timeout) { Channel channel = new MockedChannel(); Request request = new Request(index.getAndIncrement()); - return DefaultFuture.newFuture(channel, request, timeout, null); + CompletableFuture<Object> completableFuture = new CompletableFuture<>(); + return DefaultFuture.newFuture(channel, request, timeout, null, completableFuture); } } \ No newline at end of file diff --git a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeChannelTest.java b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeChannelTest.java index 2affe7d..873540d 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeChannelTest.java +++ b/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchangeChannelTest.java @@ -29,6 +29,7 @@ import org.mockito.ArgumentCaptor; import org.mockito.Mockito; import java.util.List; +import java.util.concurrent.CompletableFuture; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -143,7 +144,7 @@ public class HeaderExchangeChannelTest { Assertions.assertThrows(RemotingException.class, () -> { header.close(1000); Object requestob = new Object(); - header.request(requestob); + header.request(requestob, CompletableFuture.completedFuture(0)); }); } @@ -153,7 +154,7 @@ public class HeaderExchangeChannelTest { header = new HeaderExchangeChannel(channel); when(channel.getUrl()).thenReturn(url); Object requestob = new Object(); - header.request(requestob); + header.request(requestob, CompletableFuture.completedFuture(0)); ArgumentCaptor<Request> argumentCaptor = ArgumentCaptor.forClass(Request.class); verify(channel, times(1)).send(argumentCaptor.capture()); Assertions.assertEquals(argumentCaptor.getValue().getData(), requestob); @@ -170,7 +171,7 @@ public class HeaderExchangeChannelTest { }; header = new HeaderExchangeChannel(channel); Object requestob = new Object(); - header.request(requestob, 1000); + header.request(requestob, 1000, CompletableFuture.completedFuture(0)); }); } @@ -191,7 +192,7 @@ public class HeaderExchangeChannelTest { public void closeWithTimeoutTest02() { Assertions.assertFalse(channel.isClosed()); Request request = new Request(); - DefaultFuture.newFuture(channel, request, 100, null); + DefaultFuture.newFuture(channel, request, 100, null, CompletableFuture.completedFuture(0)); header.close(100); //return directly header.close(1000); diff --git a/dubbo-remoting/dubbo-remoting-mina/src/test/java/org/apache/remoting/transport/mina/ClientToServerTest.java b/dubbo-remoting/dubbo-remoting-mina/src/test/java/org/apache/remoting/transport/mina/ClientToServerTest.java index 6414434..bea1860 100644 --- a/dubbo-remoting/dubbo-remoting-mina/src/test/java/org/apache/remoting/transport/mina/ClientToServerTest.java +++ b/dubbo-remoting/dubbo-remoting-mina/src/test/java/org/apache/remoting/transport/mina/ClientToServerTest.java @@ -65,7 +65,7 @@ public abstract class ClientToServerTest { @Test public void testFuture() throws Exception { - CompletableFuture<Object> future = client.request(new World("world")); + CompletableFuture<Object> future = client.request(new World("world"), new CompletableFuture()); Hello result = (Hello) future.get(); Assertions.assertEquals("hello,world", result.getName()); } diff --git a/dubbo-remoting/dubbo-remoting-netty/src/test/java/org/apache/dubbo/remoting/transport/netty/ClientToServerTest.java b/dubbo-remoting/dubbo-remoting-netty/src/test/java/org/apache/dubbo/remoting/transport/netty/ClientToServerTest.java index 267f569..45ecd37 100644 --- a/dubbo-remoting/dubbo-remoting-netty/src/test/java/org/apache/dubbo/remoting/transport/netty/ClientToServerTest.java +++ b/dubbo-remoting/dubbo-remoting-netty/src/test/java/org/apache/dubbo/remoting/transport/netty/ClientToServerTest.java @@ -64,7 +64,7 @@ public abstract class ClientToServerTest { @Test public void testFuture() throws Exception { - CompletableFuture<Object> future = client.request(new World("world")); + CompletableFuture<Object> future = client.request(new World("world"), new CompletableFuture()); Hello result = (Hello) future.get(); Assertions.assertEquals("hello,world", result.getName()); } diff --git a/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/ClientToServerTest.java b/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/ClientToServerTest.java index 9b8db00..a67f92b 100644 --- a/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/ClientToServerTest.java +++ b/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/ClientToServerTest.java @@ -65,7 +65,7 @@ public abstract class ClientToServerTest { @Test public void testFuture() throws Exception { - CompletableFuture<Object> future = client.request(new World("world")); + CompletableFuture<Object> future = client.request(new World("world"), new CompletableFuture()); Hello result = (Hello) future.get(); Assertions.assertEquals("hello,world", result.getName()); } diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ChannelWrappedInvoker.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ChannelWrappedInvoker.java index 1018512..6758d04 100644 --- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ChannelWrappedInvoker.java +++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ChannelWrappedInvoker.java @@ -24,7 +24,6 @@ import org.apache.dubbo.remoting.TimeoutException; import org.apache.dubbo.remoting.exchange.ExchangeClient; import org.apache.dubbo.remoting.exchange.support.header.HeaderExchangeClient; import org.apache.dubbo.remoting.transport.ClientDelegate; -import org.apache.dubbo.rpc.AppResponse; import org.apache.dubbo.rpc.AsyncRpcResult; import org.apache.dubbo.rpc.Invocation; import org.apache.dubbo.rpc.Result; @@ -34,7 +33,6 @@ import org.apache.dubbo.rpc.protocol.AbstractInvoker; import org.apache.dubbo.rpc.support.RpcUtils; import java.net.InetSocketAddress; -import java.util.concurrent.CompletableFuture; import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY; import static org.apache.dubbo.common.constants.CommonConstants.PATH_KEY; @@ -72,15 +70,8 @@ class ChannelWrappedInvoker<T> extends AbstractInvoker<T> { currentClient.send(inv, getUrl().getMethodParameter(invocation.getMethodName(), SENT_KEY, false)); return AsyncRpcResult.newDefaultAsyncResult(invocation); } else { - CompletableFuture<Object> responseFuture = currentClient.request(inv); AsyncRpcResult asyncRpcResult = new AsyncRpcResult(inv); - responseFuture.whenComplete((appResponse, t) -> { - if (t != null) { - asyncRpcResult.completeExceptionally(t); - } else { - asyncRpcResult.complete((AppResponse) appResponse); - } - }); + currentClient.request(inv, asyncRpcResult); return asyncRpcResult; } } catch (RpcException e) { diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboInvoker.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboInvoker.java index 4cf6259..ea7cb23 100644 --- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboInvoker.java +++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboInvoker.java @@ -34,7 +34,6 @@ import org.apache.dubbo.rpc.protocol.AbstractInvoker; import org.apache.dubbo.rpc.support.RpcUtils; import java.util.Set; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.locks.ReentrantLock; @@ -98,8 +97,7 @@ public class DubboInvoker<T> extends AbstractInvoker<T> { AsyncRpcResult asyncRpcResult = new AsyncRpcResult(inv); ExecutorService executor = getCallbackExecutor(getUrl(), inv); asyncRpcResult.setExecutor(executor); - CompletableFuture<Object> responseFuture = currentClient.request(inv, timeout, executor); - asyncRpcResult.subscribeTo(responseFuture); + currentClient.request(inv, timeout, executor, asyncRpcResult); return asyncRpcResult; } } catch (TimeoutException e) { diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/LazyConnectExchangeClient.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/LazyConnectExchangeClient.java index 77eae2c..ffc3dfe 100644 --- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/LazyConnectExchangeClient.java +++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/LazyConnectExchangeClient.java @@ -35,8 +35,8 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import static org.apache.dubbo.remoting.Constants.SEND_RECONNECT_KEY; -import static org.apache.dubbo.rpc.protocol.dubbo.Constants.LAZY_CONNECT_INITIAL_STATE_KEY; import static org.apache.dubbo.rpc.protocol.dubbo.Constants.DEFAULT_LAZY_CONNECT_INITIAL_STATE; +import static org.apache.dubbo.rpc.protocol.dubbo.Constants.LAZY_CONNECT_INITIAL_STATE_KEY; /** * dubbo protocol support class. @@ -88,10 +88,10 @@ final class LazyConnectExchangeClient implements ExchangeClient { } @Override - public CompletableFuture<Object> request(Object request) throws RemotingException { + public CompletableFuture<Object> request(Object request, CompletableFuture completableFuture) throws RemotingException { warning(); initClient(); - return client.request(request); + return client.request(request, completableFuture); } @Override @@ -109,24 +109,24 @@ final class LazyConnectExchangeClient implements ExchangeClient { } @Override - public CompletableFuture<Object> request(Object request, int timeout) throws RemotingException { + public CompletableFuture<Object> request(Object request, int timeout, CompletableFuture completableFuture) throws RemotingException { warning(); initClient(); - return client.request(request, timeout); + return client.request(request, timeout, completableFuture); } @Override - public CompletableFuture<Object> request(Object request, ExecutorService executor) throws RemotingException { + public CompletableFuture<Object> request(Object request, ExecutorService executor, CompletableFuture completableFuture) throws RemotingException { warning(); initClient(); - return client.request(request, executor); + return client.request(request, executor, completableFuture); } @Override - public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException { + public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor, CompletableFuture completableFuture) throws RemotingException { warning(); initClient(); - return client.request(request, timeout, executor); + return client.request(request, timeout, executor, completableFuture); } /** diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClient.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClient.java index ed3c61b..c25ed2c 100644 --- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClient.java +++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/ReferenceCountExchangeClient.java @@ -57,8 +57,8 @@ final class ReferenceCountExchangeClient implements ExchangeClient { } @Override - public CompletableFuture<Object> request(Object request) throws RemotingException { - return client.request(request); + public CompletableFuture<Object> request(Object request, CompletableFuture completableFuture) throws RemotingException { + return client.request(request, completableFuture); } @Override @@ -77,18 +77,18 @@ final class ReferenceCountExchangeClient implements ExchangeClient { } @Override - public CompletableFuture<Object> request(Object request, int timeout) throws RemotingException { - return client.request(request, timeout); + public CompletableFuture<Object> request(Object request, int timeout, CompletableFuture completableFuture) throws RemotingException { + return client.request(request, timeout, completableFuture); } @Override - public CompletableFuture<Object> request(Object request, ExecutorService executor) throws RemotingException { - return client.request(request, executor); + public CompletableFuture<Object> request(Object request, ExecutorService executor, CompletableFuture completableFuture) throws RemotingException { + return client.request(request, executor, completableFuture); } @Override - public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException { - return client.request(request, timeout, executor); + public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor, CompletableFuture completableFuture) throws RemotingException { + return client.request(request, timeout, executor, completableFuture); } @Override diff --git a/dubbo-rpc/dubbo-rpc-thrift/src/main/java/org/apache/dubbo/rpc/protocol/thrift/ThriftInvoker.java b/dubbo-rpc/dubbo-rpc-thrift/src/main/java/org/apache/dubbo/rpc/protocol/thrift/ThriftInvoker.java index 3d0aee9..0125ddd 100644 --- a/dubbo-rpc/dubbo-rpc-thrift/src/main/java/org/apache/dubbo/rpc/protocol/thrift/ThriftInvoker.java +++ b/dubbo-rpc/dubbo-rpc-thrift/src/main/java/org/apache/dubbo/rpc/protocol/thrift/ThriftInvoker.java @@ -31,7 +31,6 @@ import org.apache.dubbo.rpc.RpcInvocation; import org.apache.dubbo.rpc.protocol.AbstractInvoker; import java.util.Set; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.locks.ReentrantLock; import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_TIMEOUT; @@ -93,8 +92,7 @@ public class ThriftInvoker<T> extends AbstractInvoker<T> { int timeout = getUrl().getMethodParameter(methodName, TIMEOUT_KEY, DEFAULT_TIMEOUT); AsyncRpcResult asyncRpcResult = new AsyncRpcResult(invocation); - CompletableFuture<Object> responseFuture = currentClient.request(inv, timeout); - asyncRpcResult.subscribeTo(responseFuture); + currentClient.request(inv, timeout, asyncRpcResult); return asyncRpcResult; } catch (TimeoutException e) { throw new RpcException(RpcException.TIMEOUT_EXCEPTION, e.getMessage(), e); diff --git a/dubbo-rpc/dubbo-rpc-thrift/src/test/java/org/apache/dubbo/rpc/protocol/thrift/ThriftCodecTest.java b/dubbo-rpc/dubbo-rpc-thrift/src/test/java/org/apache/dubbo/rpc/protocol/thrift/ThriftCodecTest.java index 36fbe6c..405550a 100644 --- a/dubbo-rpc/dubbo-rpc-thrift/src/test/java/org/apache/dubbo/rpc/protocol/thrift/ThriftCodecTest.java +++ b/dubbo-rpc/dubbo-rpc-thrift/src/test/java/org/apache/dubbo/rpc/protocol/thrift/ThriftCodecTest.java @@ -40,6 +40,7 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import java.io.ByteArrayInputStream; +import java.util.concurrent.CompletableFuture; import static org.apache.dubbo.common.constants.CommonConstants.INTERFACE_KEY; import static org.apache.dubbo.common.constants.CommonConstants.PATH_KEY; @@ -133,7 +134,7 @@ public class ThriftCodecTest { Request request = createRequest(); - DefaultFuture future = DefaultFuture.newFuture(channel, request, 10, null); + DefaultFuture future = DefaultFuture.newFuture(channel, request, 10, null, new CompletableFuture()); TMessage message = new TMessage("echoString", TMessageType.REPLY, ThriftCodec.getSeqId()); @@ -210,7 +211,7 @@ public class ThriftCodecTest { Request request = createRequest(); - DefaultFuture future = DefaultFuture.newFuture(channel, request, 10, null); + DefaultFuture future = DefaultFuture.newFuture(channel, request, 10, null, new CompletableFuture()); TMessage message = new TMessage("echoString", TMessageType.EXCEPTION, ThriftCodec.getSeqId());