This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-spring.git
The following commit(s) were added to refs/heads/master by this push:
new e26a5fa [ISSUE #727] add RocketMQConsumerLifecycleListener (#734)
e26a5fa is described below
commit e26a5fab298d3980b367fea4fc12c246341c14d9
Author: panzhi <[email protected]>
AuthorDate: Tue Jun 17 10:51:12 2025 +0800
[ISSUE #727] add RocketMQConsumerLifecycleListener (#734)
---
.../rocketmq/samples/springboot/consumer/ACLConsumer.java | 13 ++++++++++++-
.../src/main/resources/application.properties | 2 +-
.../org/apache/rocketmq/client/core/RocketMQListener.java | 1 +
...ener.java => RocketMQPushConsumerLifecycleListener.java} | 9 ++++-----
.../rocketmq/client/support/DefaultListenerContainer.java | 8 ++++++--
.../RocketMQConsumerLifecycleListener.java} | 9 +++------
6 files changed, 27 insertions(+), 15 deletions(-)
diff --git
a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-acl-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/ACLConsumer.java
b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-acl-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/ACLConsumer.java
index 2e583b7..e35bcbb 100644
---
a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-acl-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/ACLConsumer.java
+++
b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-acl-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/ACLConsumer.java
@@ -19,19 +19,30 @@ package org.apache.rocketmq.samples.springboot.consumer;
import org.apache.rocketmq.client.annotation.RocketMQMessageListener;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
+import org.apache.rocketmq.client.apis.consumer.PushConsumerBuilder;
import org.apache.rocketmq.client.apis.message.MessageView;
import org.apache.rocketmq.client.core.RocketMQListener;
+import org.apache.rocketmq.client.core.RocketMQPushConsumerLifecycleListener;
+import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
@Service
@RocketMQMessageListener(accessKey = "${demo.acl.rocketmq.access-key:}",
secretKey = "${demo.acl.rocketmq.secret-key:}",
tag = "${demo.acl.rocketmq.tag:}", topic =
"${demo.acl.rocketmq.topic:}",
endpoints = "${demo.acl.rocketmq.endpoints:}", consumerGroup =
"${demo.acl.rocketmq.consumer-group:}")
-public class ACLConsumer implements RocketMQListener {
+public class ACLConsumer implements RocketMQListener,
RocketMQPushConsumerLifecycleListener {
+
+ @Value("${demo.acl.rocketmq.consumptionThreadCount}")
+ private Integer consumptionThreadCount;
@Override
public ConsumeResult consume(MessageView messageView) {
System.out.println("handle my acl message:" + messageView);
return ConsumeResult.SUCCESS;
}
+
+ @Override
+ public void prepareStart(PushConsumerBuilder consumerBuilder) {
+ consumerBuilder.setConsumptionThreadCount(consumptionThreadCount);
+ }
}
diff --git
a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-acl-demo/src/main/resources/application.properties
b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-acl-demo/src/main/resources/application.properties
index 6aea301..f18e495 100644
---
a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-acl-demo/src/main/resources/application.properties
+++
b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-acl-demo/src/main/resources/application.properties
@@ -4,4 +4,4 @@ demo.acl.rocketmq.consumer-group=normalGroup
demo.acl.rocketmq.access-key=RocketMQ
demo.acl.rocketmq.secret-key=12345678
demo.acl.rocketmq.tag=*
-
+demo.acl.rocketmq.consumptionThreadCount=32
diff --git
a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/core/RocketMQListener.java
b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/core/RocketMQListener.java
index 91aa1d1..6ee35ff 100644
---
a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/core/RocketMQListener.java
+++
b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/core/RocketMQListener.java
@@ -21,5 +21,6 @@ import
org.apache.rocketmq.client.apis.consumer.MessageListener;
import org.apache.rocketmq.client.apis.message.MessageView;
public interface RocketMQListener extends MessageListener {
+ @Override
ConsumeResult consume(MessageView messageView);
}
diff --git
a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/core/RocketMQListener.java
b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/core/RocketMQPushConsumerLifecycleListener.java
similarity index 73%
copy from
rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/core/RocketMQListener.java
copy to
rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/core/RocketMQPushConsumerLifecycleListener.java
index 91aa1d1..070070a 100644
---
a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/core/RocketMQListener.java
+++
b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/core/RocketMQPushConsumerLifecycleListener.java
@@ -14,12 +14,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.rocketmq.client.core;
-import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
-import org.apache.rocketmq.client.apis.consumer.MessageListener;
-import org.apache.rocketmq.client.apis.message.MessageView;
+import org.apache.rocketmq.client.apis.consumer.PushConsumerBuilder;
+import org.apache.rocketmq.client.support.RocketMQConsumerLifecycleListener;
-public interface RocketMQListener extends MessageListener {
- ConsumeResult consume(MessageView messageView);
+public interface RocketMQPushConsumerLifecycleListener extends
RocketMQConsumerLifecycleListener<PushConsumerBuilder> {
}
diff --git
a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/DefaultListenerContainer.java
b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/DefaultListenerContainer.java
index 69bbe60..48151cd 100644
---
a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/DefaultListenerContainer.java
+++
b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/DefaultListenerContainer.java
@@ -24,6 +24,7 @@ import org.apache.rocketmq.client.apis.consumer.PushConsumer;
import org.apache.rocketmq.client.apis.consumer.PushConsumerBuilder;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.core.RocketMQListener;
+import org.apache.rocketmq.client.core.RocketMQPushConsumerLifecycleListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
@@ -251,8 +252,8 @@ public class DefaultListenerContainer implements
InitializingBean,
}
private void initRocketMQPushConsumer() {
- if (rocketMQMessageListener == null) {
- throw new IllegalArgumentException("Property
'rocketMQMessageListener' is required");
+ if (rocketMQListener == null) {
+ throw new IllegalArgumentException("Property 'rocketMQListener' is
required");
}
Assert.notNull(consumerGroup, "Property 'consumerGroup' is required");
Assert.notNull(topic, "Property 'topic' is required");
@@ -280,6 +281,9 @@ public class DefaultListenerContainer implements
InitializingBean,
.setMaxCacheMessageSizeInBytes(this.getMaxCacheMessageSizeInBytes())
.setMaxCacheMessageCount(this.getMaxCachedMessageCount())
.setMessageListener(rocketMQListener);
+ if (rocketMQListener instanceof RocketMQPushConsumerLifecycleListener)
{
+ ((RocketMQPushConsumerLifecycleListener)
rocketMQListener).prepareStart(pushConsumerBuilder);
+ }
this.setPushConsumerBuilder(pushConsumerBuilder);
}
diff --git
a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/core/RocketMQListener.java
b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/RocketMQConsumerLifecycleListener.java
similarity index 70%
copy from
rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/core/RocketMQListener.java
copy to
rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/RocketMQConsumerLifecycleListener.java
index 91aa1d1..6550114 100644
---
a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/core/RocketMQListener.java
+++
b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/RocketMQConsumerLifecycleListener.java
@@ -14,12 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.client.core;
-import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
-import org.apache.rocketmq.client.apis.consumer.MessageListener;
-import org.apache.rocketmq.client.apis.message.MessageView;
+package org.apache.rocketmq.client.support;
-public interface RocketMQListener extends MessageListener {
- ConsumeResult consume(MessageView messageView);
+public interface RocketMQConsumerLifecycleListener<T> {
+ void prepareStart(final T consumerBuilder);
}