This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-spring.git


The following commit(s) were added to refs/heads/master by this push:
     new e26a5fa  [ISSUE #727] add RocketMQConsumerLifecycleListener (#734)
e26a5fa is described below

commit e26a5fab298d3980b367fea4fc12c246341c14d9
Author: panzhi <[email protected]>
AuthorDate: Tue Jun 17 10:51:12 2025 +0800

    [ISSUE #727] add RocketMQConsumerLifecycleListener (#734)
---
 .../rocketmq/samples/springboot/consumer/ACLConsumer.java   | 13 ++++++++++++-
 .../src/main/resources/application.properties               |  2 +-
 .../org/apache/rocketmq/client/core/RocketMQListener.java   |  1 +
 ...ener.java => RocketMQPushConsumerLifecycleListener.java} |  9 ++++-----
 .../rocketmq/client/support/DefaultListenerContainer.java   |  8 ++++++--
 .../RocketMQConsumerLifecycleListener.java}                 |  9 +++------
 6 files changed, 27 insertions(+), 15 deletions(-)

diff --git 
a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-acl-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/ACLConsumer.java
 
b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-acl-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/ACLConsumer.java
index 2e583b7..e35bcbb 100644
--- 
a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-acl-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/ACLConsumer.java
+++ 
b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-acl-demo/src/main/java/org/apache/rocketmq/samples/springboot/consumer/ACLConsumer.java
@@ -19,19 +19,30 @@ package org.apache.rocketmq.samples.springboot.consumer;
 
 import org.apache.rocketmq.client.annotation.RocketMQMessageListener;
 import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
+import org.apache.rocketmq.client.apis.consumer.PushConsumerBuilder;
 import org.apache.rocketmq.client.apis.message.MessageView;
 import org.apache.rocketmq.client.core.RocketMQListener;
+import org.apache.rocketmq.client.core.RocketMQPushConsumerLifecycleListener;
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Service;
 
 @Service
 @RocketMQMessageListener(accessKey = "${demo.acl.rocketmq.access-key:}", 
secretKey = "${demo.acl.rocketmq.secret-key:}",
         tag = "${demo.acl.rocketmq.tag:}", topic = 
"${demo.acl.rocketmq.topic:}",
         endpoints = "${demo.acl.rocketmq.endpoints:}", consumerGroup = 
"${demo.acl.rocketmq.consumer-group:}")
-public class ACLConsumer implements RocketMQListener {
+public class ACLConsumer implements RocketMQListener, 
RocketMQPushConsumerLifecycleListener {
+
+    @Value("${demo.acl.rocketmq.consumptionThreadCount}")
+    private Integer consumptionThreadCount;
     @Override
     public ConsumeResult consume(MessageView messageView) {
         System.out.println("handle my acl message:" + messageView);
         return ConsumeResult.SUCCESS;
     }
+
+    @Override
+    public void prepareStart(PushConsumerBuilder consumerBuilder) {
+        consumerBuilder.setConsumptionThreadCount(consumptionThreadCount);
+    }
 }
 
diff --git 
a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-acl-demo/src/main/resources/application.properties
 
b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-acl-demo/src/main/resources/application.properties
index 6aea301..f18e495 100644
--- 
a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-acl-demo/src/main/resources/application.properties
+++ 
b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-acl-demo/src/main/resources/application.properties
@@ -4,4 +4,4 @@ demo.acl.rocketmq.consumer-group=normalGroup
 demo.acl.rocketmq.access-key=RocketMQ
 demo.acl.rocketmq.secret-key=12345678
 demo.acl.rocketmq.tag=*
-
+demo.acl.rocketmq.consumptionThreadCount=32
diff --git 
a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/core/RocketMQListener.java
 
b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/core/RocketMQListener.java
index 91aa1d1..6ee35ff 100644
--- 
a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/core/RocketMQListener.java
+++ 
b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/core/RocketMQListener.java
@@ -21,5 +21,6 @@ import 
org.apache.rocketmq.client.apis.consumer.MessageListener;
 import org.apache.rocketmq.client.apis.message.MessageView;
 
 public interface RocketMQListener extends MessageListener {
+    @Override
     ConsumeResult consume(MessageView messageView);
 }
diff --git 
a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/core/RocketMQListener.java
 
b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/core/RocketMQPushConsumerLifecycleListener.java
similarity index 73%
copy from 
rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/core/RocketMQListener.java
copy to 
rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/core/RocketMQPushConsumerLifecycleListener.java
index 91aa1d1..070070a 100644
--- 
a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/core/RocketMQListener.java
+++ 
b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/core/RocketMQPushConsumerLifecycleListener.java
@@ -14,12 +14,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.rocketmq.client.core;
 
-import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
-import org.apache.rocketmq.client.apis.consumer.MessageListener;
-import org.apache.rocketmq.client.apis.message.MessageView;
+import org.apache.rocketmq.client.apis.consumer.PushConsumerBuilder;
+import org.apache.rocketmq.client.support.RocketMQConsumerLifecycleListener;
 
-public interface RocketMQListener extends MessageListener {
-    ConsumeResult consume(MessageView messageView);
+public interface RocketMQPushConsumerLifecycleListener extends 
RocketMQConsumerLifecycleListener<PushConsumerBuilder> {
 }
diff --git 
a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/DefaultListenerContainer.java
 
b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/DefaultListenerContainer.java
index 69bbe60..48151cd 100644
--- 
a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/DefaultListenerContainer.java
+++ 
b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/DefaultListenerContainer.java
@@ -24,6 +24,7 @@ import org.apache.rocketmq.client.apis.consumer.PushConsumer;
 import org.apache.rocketmq.client.apis.consumer.PushConsumerBuilder;
 import org.apache.rocketmq.client.apis.consumer.FilterExpression;
 import org.apache.rocketmq.client.core.RocketMQListener;
+import org.apache.rocketmq.client.core.RocketMQPushConsumerLifecycleListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.BeansException;
@@ -251,8 +252,8 @@ public class DefaultListenerContainer implements 
InitializingBean,
     }
 
     private void initRocketMQPushConsumer() {
-        if (rocketMQMessageListener == null) {
-            throw new IllegalArgumentException("Property 
'rocketMQMessageListener' is required");
+        if (rocketMQListener == null) {
+            throw new IllegalArgumentException("Property 'rocketMQListener' is 
required");
         }
         Assert.notNull(consumerGroup, "Property 'consumerGroup' is required");
         Assert.notNull(topic, "Property 'topic' is required");
@@ -280,6 +281,9 @@ public class DefaultListenerContainer implements 
InitializingBean,
                 
.setMaxCacheMessageSizeInBytes(this.getMaxCacheMessageSizeInBytes())
                 .setMaxCacheMessageCount(this.getMaxCachedMessageCount())
                 .setMessageListener(rocketMQListener);
+        if (rocketMQListener instanceof RocketMQPushConsumerLifecycleListener) 
{
+            ((RocketMQPushConsumerLifecycleListener) 
rocketMQListener).prepareStart(pushConsumerBuilder);
+        }
         this.setPushConsumerBuilder(pushConsumerBuilder);
     }
 
diff --git 
a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/core/RocketMQListener.java
 
b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/RocketMQConsumerLifecycleListener.java
similarity index 70%
copy from 
rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/core/RocketMQListener.java
copy to 
rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/RocketMQConsumerLifecycleListener.java
index 91aa1d1..6550114 100644
--- 
a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/core/RocketMQListener.java
+++ 
b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/RocketMQConsumerLifecycleListener.java
@@ -14,12 +14,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.client.core;
 
-import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
-import org.apache.rocketmq.client.apis.consumer.MessageListener;
-import org.apache.rocketmq.client.apis.message.MessageView;
+package org.apache.rocketmq.client.support;
 
-public interface RocketMQListener extends MessageListener {
-    ConsumeResult consume(MessageView messageView);
+public interface RocketMQConsumerLifecycleListener<T> {
+    void prepareStart(final T consumerBuilder);
 }

Reply via email to