This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-spring.git
The following commit(s) were added to refs/heads/master by this push:
new 603903c [ISSUE #262] Sending messages asynchronously use a custom
AsyncSenderExecutor in RocketMQTemplate.
603903c is described below
commit 603903c2eb199ee9165e81e1f4eea9694399a551
Author: zhangjidi <[email protected]>
AuthorDate: Mon Jun 8 14:37:31 2020 +0800
[ISSUE #262] Sending messages asynchronously use a custom
AsyncSenderExecutor in RocketMQTemplate.
---
.../rocketmq/spring/core/RocketMQTemplate.java | 6 ++++
.../rocketmq/spring/core/RocketMQTemplateTest.java | 39 ++++++++++++++++++++++
2 files changed, 45 insertions(+)
diff --git
a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
index 6683f10..78f0b2d 100644
---
a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
+++
b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/core/RocketMQTemplate.java
@@ -24,6 +24,8 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.Objects;
+import java.util.concurrent.ExecutorService;
+
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
@@ -86,6 +88,10 @@ public class RocketMQTemplate extends
AbstractMessageSendingTemplate<String> imp
this.messageQueueSelector = messageQueueSelector;
}
+ public void setAsyncSenderExecutor(ExecutorService asyncSenderExecutor) {
+ this.producer.setAsyncSenderExecutor(asyncSenderExecutor);
+ }
+
/**
* @param destination formats: `topicName:tags`
* @param message {@link org.springframework.messaging.Message} the
message to be sent.
diff --git
a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/core/RocketMQTemplateTest.java
b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/core/RocketMQTemplateTest.java
index da6d777..56a7de6 100644
---
a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/core/RocketMQTemplateTest.java
+++
b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/core/RocketMQTemplateTest.java
@@ -34,6 +34,13 @@ import org.springframework.messaging.MessagingException;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
@@ -88,6 +95,38 @@ public class RocketMQTemplateTest {
}
@Test
+ public void testSendMessage_withCustomAsyncSenderExecutor() {
+ ExecutorService executorService = new ThreadPoolExecutor(
+ 2,
+ 5,
+ 100,
+ TimeUnit.SECONDS,
+ new ArrayBlockingQueue<Runnable>(2000),
+ new ThreadFactory() {
+ private AtomicInteger threadIndex = new AtomicInteger(0);
+
+ @Override
+ public Thread newThread(Runnable r) {
+ return new Thread(r, "AsyncSenderExecutor_" +
this.threadIndex.incrementAndGet());
+ }
+ });
+ rocketMQTemplate.setAsyncSenderExecutor(executorService);
+ try {
+ rocketMQTemplate.asyncSend(topic, "payload", new SendCallback() {
+ @Override public void onSuccess(SendResult sendResult) {
+
+ }
+
+ @Override public void onException(Throwable e) {
+
+ }
+ });
+ } catch (MessagingException e) {
+
assertThat(e).hasMessageContaining("org.apache.rocketmq.remoting.exception.RemotingConnectException:
connect to [127.0.0.1:9876] failed");
+ }
+ }
+
+ @Test
public void testSendAndReceive_NullMessage() {
try {
String response =
rocketMQTemplate.sendAndReceive(stringRequestTopic, new Message<String>() {