ymwneu closed pull request #580: Change async send thread pool
URL: https://github.com/apache/rocketmq/pull/580
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
index 7ace9d5b0..f21850c48 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
@@ -30,8 +30,10 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.Validators;
import org.apache.rocketmq.client.common.ClientErrorCode;
@@ -98,6 +100,9 @@
private MQClientInstance mQClientFactory;
private ArrayList<CheckForbiddenHook> checkForbiddenHookList = new
ArrayList<CheckForbiddenHook>();
private int zipCompressLevel =
Integer.parseInt(System.getProperty(MixAll.MESSAGE_COMPRESS_LEVEL, "5"));
+ private final BlockingQueue<Runnable> asyncSenderThreadPoolQueue;
+ private final ExecutorService defaultAsyncSenderExecutor;
+ private ExecutorService asyncSenderExecutor;
private MQFaultStrategy mqFaultStrategy = new MQFaultStrategy();
@@ -108,6 +113,22 @@ public DefaultMQProducerImpl(final DefaultMQProducer
defaultMQProducer) {
public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer,
RPCHook rpcHook) {
this.defaultMQProducer = defaultMQProducer;
this.rpcHook = rpcHook;
+
+ this.asyncSenderThreadPoolQueue = new
LinkedBlockingQueue<Runnable>(50000);
+ this.defaultAsyncSenderExecutor = new ThreadPoolExecutor(
+ 4,
+ 4,
+ 1000 * 60,
+ TimeUnit.MILLISECONDS,
+ this.asyncSenderThreadPoolQueue,
+ new ThreadFactory() {
+ private AtomicInteger threadIndex = new AtomicInteger(0);
+
+ @Override
+ public Thread newThread(Runnable r) {
+ return new Thread(r, "AsyncSenderExecutor_" +
this.threadIndex.incrementAndGet());
+ }
+ });
}
public void registerCheckForbiddenHook(CheckForbiddenHook
checkForbiddenHook) {
@@ -456,7 +477,7 @@ public void send(Message msg,
public void send(final Message msg, final SendCallback sendCallback, final
long timeout)
throws MQClientException, RemotingException, InterruptedException {
final long beginStartTime = System.currentTimeMillis();
- ExecutorService executor = this.getCallbackExecutor();
+ ExecutorService executor = this.getAsyncSenderExecutor();
try {
executor.submit(new Runnable() {
@Override
@@ -957,7 +978,7 @@ public void send(Message msg, MessageQueue mq, SendCallback
sendCallback)
public void send(final Message msg, final MessageQueue mq, final
SendCallback sendCallback, final long timeout)
throws MQClientException, RemotingException, InterruptedException {
final long beginStartTime = System.currentTimeMillis();
- ExecutorService executor = this.getCallbackExecutor();
+ ExecutorService executor = this.getAsyncSenderExecutor();
try {
executor.submit(new Runnable() {
@Override
@@ -1079,7 +1100,7 @@ public void send(Message msg, MessageQueueSelector
selector, Object arg, SendCal
public void send(final Message msg, final MessageQueueSelector selector,
final Object arg, final SendCallback sendCallback, final long timeout)
throws MQClientException, RemotingException, InterruptedException {
final long beginStartTime = System.currentTimeMillis();
- ExecutorService executor = this.getCallbackExecutor();
+ ExecutorService executor = this.getAsyncSenderExecutor();
try {
executor.submit(new Runnable() {
@Override
@@ -1243,9 +1264,13 @@ public void endTransaction(
public void setCallbackExecutor(final ExecutorService callbackExecutor) {
this.mQClientFactory.getMQClientAPIImpl().getRemotingClient().setCallbackExecutor(callbackExecutor);
}
- public ExecutorService getCallbackExecutor() {
- return
this.mQClientFactory.getMQClientAPIImpl().getRemotingClient().getCallbackExecutor();
+ public ExecutorService getAsyncSenderExecutor() {
+ return null == asyncSenderExecutor ? defaultAsyncSenderExecutor :
asyncSenderExecutor;
+ }
+
+ public void setAsyncSenderExecutor(ExecutorService asyncSenderExecutor) {
+ this.asyncSenderExecutor = asyncSenderExecutor;
}
public SendResult send(Message msg,
diff --git
a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
index 9732d0eb8..2385b46ba 100644
---
a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
+++
b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
@@ -655,6 +655,16 @@ public void setCallbackExecutor(final ExecutorService
callbackExecutor) {
this.defaultMQProducerImpl.setCallbackExecutor(callbackExecutor);
}
+ /**
+ * Sets an Executor to be used for executing asynchronous send.
+ * If the Executor is not set, {@link
DefaultMQProducerImpl#defaultAsyncSenderExecutor} will be used.
+ *
+ * @param asyncSenderExecutor the instance of Executor
+ */
+ public void setAsyncSenderExecutor(final ExecutorService
asyncSenderExecutor){
+ this.defaultMQProducerImpl.setAsyncSenderExecutor(asyncSenderExecutor);
+ }
+
private MessageBatch batch(Collection<Message> msgs) throws
MQClientException {
MessageBatch msgBatch;
try {
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services