Use LinkedBlockingQueue for better performance
Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/489b1d8b Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/489b1d8b Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/489b1d8b Branch: refs/heads/rocketmq5 Commit: 489b1d8b7829990bd5b6ecef1ab5aeae685a1ec4 Parents: 114b6ae Author: yukon <[email protected]> Authored: Wed Sep 20 17:18:43 2017 +0800 Committer: yukon <[email protected]> Committed: Wed Sep 20 17:18:43 2017 +0800 ---------------------------------------------------------------------- .../impl/netty/NettyRemotingAbstract.java | 3 +-- .../rpc/impl/client/SimpleClientImpl.java | 12 +++++++---- .../rpc/impl/server/SimpleServerImpl.java | 11 ++++++---- .../rpc/impl/service/RpcInstanceAbstract.java | 12 +++++++---- .../rpc/impl/service/RpcProxyCommon.java | 22 +++++++++++++------- 5 files changed, 39 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/489b1d8b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java ---------------------------------------------------------------------- diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java index 4c22e7c..a5c2118 100644 --- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java +++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/NettyRemotingAbstract.java @@ -25,7 +25,6 @@ import io.netty.channel.SimpleChannelInboundHandler; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.util.Map; -import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; @@ -98,7 +97,7 @@ public abstract class NettyRemotingAbstract implements RemotingService { clientConfig.getClientAsyncCallbackExecutorThreads(), 60, TimeUnit.SECONDS, - new ArrayBlockingQueue<Runnable>(10000), + new LinkedBlockingQueue<Runnable>(10000), "PublicExecutor", true); this.remotingCommandFactory = new RemotingCommandFactoryImpl(remotingCommandFactoryMeta); } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/489b1d8b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/client/SimpleClientImpl.java ---------------------------------------------------------------------- diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/client/SimpleClientImpl.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/client/SimpleClientImpl.java index 787e8c1..4483ca3 100644 --- a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/client/SimpleClientImpl.java +++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/client/SimpleClientImpl.java @@ -18,8 +18,8 @@ package org.apache.rocketmq.rpc.impl.client; import java.util.Properties; -import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import org.apache.rocketmq.remoting.api.RemotingClient; import org.apache.rocketmq.remoting.api.RemotingService; @@ -48,9 +48,13 @@ public class SimpleClientImpl extends RpcInstanceAbstract implements SimpleClien super(rpcCommonConfig); this.remotingClient = remotingClient; this.rpcCommonConfig = rpcCommonConfig; - this.callServiceThreadPool = ThreadUtils.newThreadPoolExecutor(rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), - rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), rpcCommonConfig.getServiceThreadKeepAliveTime(), - TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(rpcCommonConfig.getServiceThreadBlockQueueSize()), "clientCallServiceThread", true); + this.callServiceThreadPool = ThreadUtils.newThreadPoolExecutor( + rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), + rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), + rpcCommonConfig.getServiceThreadKeepAliveTime(), + TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<Runnable>(rpcCommonConfig.getServiceThreadBlockQueueSize()), + "clientCallServiceThread", true); } public void initialize() { http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/489b1d8b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/server/SimpleServerImpl.java ---------------------------------------------------------------------- diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/server/SimpleServerImpl.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/server/SimpleServerImpl.java index e076cbe..1fdda49 100644 --- a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/server/SimpleServerImpl.java +++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/server/SimpleServerImpl.java @@ -18,8 +18,8 @@ package org.apache.rocketmq.rpc.impl.server; import java.util.Properties; -import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import org.apache.rocketmq.remoting.api.RemotingServer; import org.apache.rocketmq.remoting.api.RemotingService; @@ -40,9 +40,12 @@ public class SimpleServerImpl extends RpcInstanceAbstract implements SimpleServe public SimpleServerImpl(final RpcCommonConfig remotingConfig) { this(remotingConfig, RemotingBootstrapFactory.createRemotingServer(remotingConfig)); this.rpcCommonConfig = remotingConfig; - this.callServiceThreadPool = ThreadUtils.newThreadPoolExecutor(rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), - rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), rpcCommonConfig.getServiceThreadKeepAliveTime(), - TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(remotingConfig.getServiceThreadBlockQueueSize()), + this.callServiceThreadPool = ThreadUtils.newThreadPoolExecutor( + rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), + rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), + rpcCommonConfig.getServiceThreadKeepAliveTime(), + TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<Runnable>(remotingConfig.getServiceThreadBlockQueueSize()), "serverCallServiceThread", true); } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/489b1d8b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcInstanceAbstract.java ---------------------------------------------------------------------- diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcInstanceAbstract.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcInstanceAbstract.java index 2b1288c..8c0ddf2 100644 --- a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcInstanceAbstract.java +++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcInstanceAbstract.java @@ -17,8 +17,8 @@ package org.apache.rocketmq.rpc.impl.service; -import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.apache.rocketmq.remoting.external.ThreadUtils; @@ -45,9 +45,13 @@ public abstract class RpcInstanceAbstract extends RpcProxyCommon { this.rpcCommonConfig = rpcCommonConfig; this.rpcRequestProcessor = new RpcRequestProcessor(this.threadLocalProviderContext, this, serviceStats); - this.invokeServiceThreadPool = new ThreadPoolExecutor(rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), - rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), 60, TimeUnit.SECONDS, - new ArrayBlockingQueue<Runnable>(rpcCommonConfig.getServiceThreadBlockQueueSize()), newThreadFactory("rpcInvokeServiceThread", true)); + this.invokeServiceThreadPool = new ThreadPoolExecutor( + rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), + rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), + 60, + TimeUnit.SECONDS, + new LinkedBlockingQueue<Runnable>(rpcCommonConfig.getServiceThreadBlockQueueSize()), + newThreadFactory("rpcInvokeServiceThread", true)); } http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/489b1d8b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcProxyCommon.java ---------------------------------------------------------------------- diff --git a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcProxyCommon.java b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcProxyCommon.java index c5d9a3c..2487f79 100644 --- a/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcProxyCommon.java +++ b/remoting-core/rpc-impl/src/main/java/org/apache/rocketmq/rpc/impl/service/RpcProxyCommon.java @@ -21,11 +21,11 @@ import java.lang.reflect.Method; import java.lang.reflect.ParameterizedType; import java.lang.reflect.Type; import java.nio.ByteBuffer; -import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import org.apache.rocketmq.remoting.api.AsyncHandler; import org.apache.rocketmq.remoting.api.RemotingService; @@ -68,12 +68,20 @@ public abstract class RpcProxyCommon { public RpcProxyCommon(RpcCommonConfig rpcCommonConfig) { this.rpcCommonConfig = rpcCommonConfig; this.serviceStats = new ServiceStats(); - this.promiseExecutorService = ThreadUtils.newThreadPoolExecutor(rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), - rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), rpcCommonConfig.getServiceThreadKeepAliveTime(), - TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(rpcCommonConfig.getServiceThreadBlockQueueSize()), "promiseExecutorService", true); - this.callServiceThreadPool = ThreadUtils.newThreadPoolExecutor(rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), - rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), rpcCommonConfig.getServiceThreadKeepAliveTime(), - TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(rpcCommonConfig.getServiceThreadBlockQueueSize()), "callServiceThread", true); + this.promiseExecutorService = ThreadUtils.newThreadPoolExecutor( + rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), + rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), + rpcCommonConfig.getServiceThreadKeepAliveTime(), + TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<Runnable>(rpcCommonConfig.getServiceThreadBlockQueueSize()), + "promiseExecutorService", true); + this.callServiceThreadPool = ThreadUtils.newThreadPoolExecutor( + rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), + rpcCommonConfig.getClientAsyncCallbackExecutorThreads(), + rpcCommonConfig.getServiceThreadKeepAliveTime(), + TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<Runnable>(rpcCommonConfig.getServiceThreadBlockQueueSize()), + "callServiceThread", true); } private RemotingCommand createRemoteRequest(RemoteService serviceExport, Method method, Object[] args,
