This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-spring.git


The following commit(s) were added to refs/heads/master by this push:
     new 603903c  [ISSUE #262] Sending messages asynchronously use a custom 
AsyncSenderExecutor in RocketMQTemplate.
603903c is described below

commit 603903c2eb199ee9165e81e1f4eea9694399a551
Author: zhangjidi <[email protected]>
AuthorDate: Mon Jun 8 14:37:31 2020 +0800

    [ISSUE #262] Sending messages asynchronously use a custom 
AsyncSenderExecutor in RocketMQTemplate.
---
 .../rocketmq/spring/core/RocketMQTemplate.java     |  6 ++++
 .../rocketmq/spring/core/RocketMQTemplateTest.java | 39 ++++++++++++++++++++++
 2 files changed, 45 insertions(+)

diff --git 
a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
 
b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
index 6683f10..78f0b2d 100644
--- 
a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
+++ 
b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
@@ -24,6 +24,8 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Map;
 import java.util.Objects;
+import java.util.concurrent.ExecutorService;
+
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
 import org.apache.rocketmq.client.producer.MessageQueueSelector;
@@ -86,6 +88,10 @@ public class RocketMQTemplate extends 
AbstractMessageSendingTemplate<String> imp
         this.messageQueueSelector = messageQueueSelector;
     }
 
+    public void setAsyncSenderExecutor(ExecutorService asyncSenderExecutor) {
+        this.producer.setAsyncSenderExecutor(asyncSenderExecutor);
+    }
+
     /**
      * @param destination formats: `topicName:tags`
      * @param message {@link org.springframework.messaging.Message} the 
message to be sent.
diff --git 
a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/core/RocketMQTemplateTest.java
 
b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/core/RocketMQTemplateTest.java
index da6d777..56a7de6 100644
--- 
a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/core/RocketMQTemplateTest.java
+++ 
b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/core/RocketMQTemplateTest.java
@@ -34,6 +34,13 @@ import org.springframework.messaging.MessagingException;
 import org.springframework.messaging.support.MessageBuilder;
 import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
 
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.mockito.ArgumentMatchers.any;
 
@@ -88,6 +95,38 @@ public class RocketMQTemplateTest {
     }
 
     @Test
+    public void testSendMessage_withCustomAsyncSenderExecutor() {
+        ExecutorService executorService = new ThreadPoolExecutor(
+            2,
+            5,
+            100,
+            TimeUnit.SECONDS,
+            new ArrayBlockingQueue<Runnable>(2000),
+            new ThreadFactory() {
+                private AtomicInteger threadIndex = new AtomicInteger(0);
+
+                @Override
+                public Thread newThread(Runnable r) {
+                    return new Thread(r, "AsyncSenderExecutor_" + 
this.threadIndex.incrementAndGet());
+                }
+            });
+        rocketMQTemplate.setAsyncSenderExecutor(executorService);
+        try {
+            rocketMQTemplate.asyncSend(topic, "payload", new SendCallback() {
+                @Override public void onSuccess(SendResult sendResult) {
+
+                }
+
+                @Override public void onException(Throwable e) {
+
+                }
+            });
+        } catch (MessagingException e) {
+            
assertThat(e).hasMessageContaining("org.apache.rocketmq.remoting.exception.RemotingConnectException:
 connect to [127.0.0.1:9876] failed");
+        }
+    }
+
+    @Test
     public void testSendAndReceive_NullMessage() {
         try {
             String response = 
rocketMQTemplate.sendAndReceive(stringRequestTopic, new Message<String>() {

Reply via email to