This is an automated email from the ASF dual-hosted git repository.

liujun pushed a commit to branch threadless-executor-enhancement
in repository https://gitbox.apache.org/repos/asf/dubbo.git

commit 69a78d73d42ff432935bf9306f06cac7561d3799
Author: ken.lj <[email protected]>
AuthorDate: Mon Dec 16 17:18:10 2019 +0800

    Enhance consumer side thread model: threadless executor
---
 .../common/threadpool/ThreadlessExecutor.java      | 21 +++++++++++--
 .../manager/DefaultExecutorRepository.java         | 13 +++++++++
 .../remoting/exchange/support/DefaultFuture.java   | 34 +++++++++++++++-------
 .../java/org/apache/dubbo/rpc/AsyncRpcResult.java  | 19 +++++++-----
 .../dubbo/rpc/protocol/AsyncToSyncInvoker.java     |  5 ++++
 5 files changed, 71 insertions(+), 21 deletions(-)

diff --git 
a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/ThreadlessExecutor.java
 
b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/ThreadlessExecutor.java
index 322d8d9..a0fc084 100644
--- 
a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/ThreadlessExecutor.java
+++ 
b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/ThreadlessExecutor.java
@@ -19,9 +19,11 @@ package org.apache.dubbo.common.threadpool;
 import org.apache.dubbo.common.logger.Logger;
 import org.apache.dubbo.common.logger.LoggerFactory;
 
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.AbstractExecutorService;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
@@ -42,6 +44,8 @@ public class ThreadlessExecutor extends 
AbstractExecutorService {
 
     private ExecutorService sharedExecutor;
 
+    private CompletableFuture<?> waitingFuture;
+
     private volatile boolean waiting = true;
 
     private final Object lock = new Object();
@@ -50,6 +54,14 @@ public class ThreadlessExecutor extends 
AbstractExecutorService {
         this.sharedExecutor = sharedExecutor;
     }
 
+    public CompletableFuture<?> getWaitingFuture() {
+        return waitingFuture;
+    }
+
+    public void setWaitingFuture(CompletableFuture<?> waitingFuture) {
+        this.waitingFuture = waitingFuture;
+    }
+
     public boolean isWaiting() {
         return waiting;
     }
@@ -113,9 +125,10 @@ public class ThreadlessExecutor extends 
AbstractExecutorService {
     /**
      * tells the thread blocking on {@link #waitAndDrain()} to return, despite 
of the current status, to avoid endless waiting.
      */
-    public void notifyReturn() {
+    public void notifyReturn(Throwable t) {
         // an empty runnable task.
         execute(() -> {
+            waitingFuture.completeExceptionally(t);
         });
     }
 
@@ -125,12 +138,14 @@ public class ThreadlessExecutor extends 
AbstractExecutorService {
 
     @Override
     public void shutdown() {
-
+        shutdownNow();
     }
 
     @Override
     public List<Runnable> shutdownNow() {
-        return null;
+        notifyReturn(new IllegalStateException("Consumer is shutting down and 
this call is going to be stopped without " +
+                "receiving any result, usually this is called by a slow 
provider instance or bad service implementation."));
+        return Collections.emptyList();
     }
 
     @Override
diff --git 
a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/DefaultExecutorRepository.java
 
b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/DefaultExecutorRepository.java
index f0aab3c..dd37bff 100644
--- 
a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/DefaultExecutorRepository.java
+++ 
b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/DefaultExecutorRepository.java
@@ -64,6 +64,12 @@ public class DefaultExecutorRepository implements 
ExecutorRepository {
         serviceExporterExecutor = Executors.newScheduledThreadPool(1, new 
NamedThreadFactory("Dubbo-exporter-scheduler"));
     }
 
+    /**
+     * Get called when the server or client instance initiating.
+     *
+     * @param url
+     * @return
+     */
     public synchronized ExecutorService createExecutorIfAbsent(URL url) {
         String componentKey = EXECUTOR_SERVICE_COMPONENT_KEY;
         if (CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(SIDE_KEY))) {
@@ -87,7 +93,14 @@ public class DefaultExecutorRepository implements 
ExecutorRepository {
             componentKey = CONSUMER_SIDE;
         }
         Map<Integer, ExecutorService> executors = data.get(componentKey);
+
+        /**
+         * It's guaranteed that this method is called after {@link 
#createExecutorIfAbsent(URL)}, so data should already
+         * have Executor instances generated and stored.
+         */
         if (executors == null) {
+            logger.warn("No available executors, this is not expected, 
framework should call createExecutorIfAbsent first " +
+                    "before coming to here.");
             return null;
         }
 
diff --git 
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java
 
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java
index aeaff45..46eb552 100644
--- 
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java
+++ 
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java
@@ -107,6 +107,10 @@ public class DefaultFuture extends 
CompletableFuture<Object> {
     public static DefaultFuture newFuture(Channel channel, Request request, 
int timeout, ExecutorService executor) {
         final DefaultFuture future = new DefaultFuture(channel, request, 
timeout);
         future.setExecutor(executor);
+        // ThreadlessExecutor needs to hold the waiting future in case of 
circuit return.
+        if (executor instanceof ThreadlessExecutor) {
+            ((ThreadlessExecutor) executor).setWaitingFuture(future);
+        }
         // timeout check
         timeoutCheck(future);
         return future;
@@ -138,6 +142,11 @@ public class DefaultFuture extends 
CompletableFuture<Object> {
             if (channel.equals(entry.getValue())) {
                 DefaultFuture future = getFuture(entry.getKey());
                 if (future != null && !future.isDone()) {
+                    ExecutorService futureExecutor = future.getExecutor();
+                    if (futureExecutor != null) {
+                        futureExecutor.shutdownNow();
+                    }
+
                     Response disconnectResponse = new Response(future.getId());
                     disconnectResponse.setStatus(Response.CHANNEL_INACTIVE);
                     disconnectResponse.setErrorMessage("Channel " +
@@ -208,7 +217,8 @@ public class DefaultFuture extends 
CompletableFuture<Object> {
         if (executor != null && executor instanceof ThreadlessExecutor) {
             ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor) 
executor;
             if (threadlessExecutor.isWaiting()) {
-                threadlessExecutor.notifyReturn();
+                threadlessExecutor.notifyReturn(new IllegalStateException("The 
result has returned, but the biz thread is still waiting" +
+                        " which is not an expected state, interrupt the thread 
manually by returning an exception."));
             }
         }
     }
@@ -271,16 +281,20 @@ public class DefaultFuture extends 
CompletableFuture<Object> {
                 return;
             }
             if (future.getExecutor() != null) {
-                future.getExecutor().execute(() -> {
-                    // create exception response.
-                    Response timeoutResponse = new Response(future.getId());
-                    // set timeout status.
-                    timeoutResponse.setStatus(future.isSent() ? 
Response.SERVER_TIMEOUT : Response.CLIENT_TIMEOUT);
-                    
timeoutResponse.setErrorMessage(future.getTimeoutMessage(true));
-                    // handle response.
-                    DefaultFuture.received(future.getChannel(), 
timeoutResponse, true);
-                });
+                future.getExecutor().execute(() -> notifyTimeout(future));
+            } else {
+                notifyTimeout(future);
             }
         }
+
+        private void notifyTimeout(DefaultFuture future) {
+            // create exception response.
+            Response timeoutResponse = new Response(future.getId());
+            // set timeout status.
+            timeoutResponse.setStatus(future.isSent() ? 
Response.SERVER_TIMEOUT : Response.CLIENT_TIMEOUT);
+            timeoutResponse.setErrorMessage(future.getTimeoutMessage(true));
+            // handle response.
+            DefaultFuture.received(future.getChannel(), timeoutResponse, true);
+        }
     }
 }
diff --git 
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java
 
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java
index cb73c03..5a6e169 100644
--- 
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java
+++ 
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AsyncRpcResult.java
@@ -96,8 +96,9 @@ public class AsyncRpcResult implements Result {
                 responseFuture.complete(appResponse);
             }
         } catch (Exception e) {
-            // This should never happen;
-            logger.error("Got exception when trying to change the value of the 
underlying result from AsyncRpcResult.", e);
+            // This should not happen in normal request process;
+            logger.error("Got exception when trying to fetch the underlying 
result from AsyncRpcResult.");
+            throw new RpcException(e);
         }
     }
 
@@ -117,8 +118,9 @@ public class AsyncRpcResult implements Result {
                 responseFuture.complete(appResponse);
             }
         } catch (Exception e) {
-            // This should never happen;
-            logger.error("Got exception when trying to change the value of the 
underlying result from AsyncRpcResult.", e);
+            // This should not happen in normal request process;
+            logger.error("Got exception when trying to fetch the underlying 
result from AsyncRpcResult.");
+            throw new RpcException(e);
         }
     }
 
@@ -141,8 +143,9 @@ public class AsyncRpcResult implements Result {
                 return responseFuture.get();
             }
         } catch (Exception e) {
-            // This should never happen;
-            logger.error("Got exception when trying to fetch the underlying 
result from AsyncRpcResult.", e);
+            // This should not happen in normal request process;
+            logger.error("Got exception when trying to fetch the underlying 
result from AsyncRpcResult.");
+            throw new RpcException(e);
         }
         return new AppResponse();
     }
@@ -158,7 +161,7 @@ public class AsyncRpcResult implements Result {
      */
     @Override
     public Result get() throws InterruptedException, ExecutionException {
-        if (executor != null) {
+        if (executor != null && executor instanceof ThreadlessExecutor) {
             ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor) 
executor;
             threadlessExecutor.waitAndDrain();
         }
@@ -167,7 +170,7 @@ public class AsyncRpcResult implements Result {
 
     @Override
     public Result get(long timeout, TimeUnit unit) throws 
InterruptedException, ExecutionException, TimeoutException {
-        if (executor != null) {
+        if (executor != null && executor instanceof ThreadlessExecutor) {
             ThreadlessExecutor threadlessExecutor = (ThreadlessExecutor) 
executor;
             threadlessExecutor.waitAndDrain();
         }
diff --git 
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AsyncToSyncInvoker.java
 
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AsyncToSyncInvoker.java
index 0b28532..bc381d2 100644
--- 
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AsyncToSyncInvoker.java
+++ 
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AsyncToSyncInvoker.java
@@ -53,6 +53,11 @@ public class AsyncToSyncInvoker<T> implements Invoker<T> {
 
         try {
             if (InvokeMode.SYNC == ((RpcInvocation) 
invocation).getInvokeMode()) {
+                /**
+                 * NOTICE!
+                 * must call {@link 
java.util.concurrent.CompletableFuture#get(long, TimeUnit)} because
+                 * {@link java.util.concurrent.CompletableFuture#get()} was 
proved to have serious performance drop.
+                 */
                 asyncResult.get(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
             }
         } catch (InterruptedException e) {

Reply via email to