This is an automated email from the ASF dual-hosted git repository.
yuzhou pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 5600684eb4 [ISSUE #8725] clean DefaultMQPushConsumer after start fail
(#8726)
5600684eb4 is described below
commit 5600684eb437dd5a4aeb9c658e24200bcb74909b
Author: yuz10 <[email protected]>
AuthorDate: Wed Oct 30 20:08:28 2024 +0800
[ISSUE #8725] clean DefaultMQPushConsumer after start fail (#8726)
* [ISSUE #8725]clean DefaultMQPushConsumer after start fail
* clean DefaultLitePullConsumerImpl after start fail
---
.../client/impl/consumer/DefaultLitePullConsumerImpl.java | 7 ++++++-
.../client/impl/consumer/DefaultMQPushConsumerImpl.java | 14 ++++++++++----
2 files changed, 16 insertions(+), 5 deletions(-)
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
index 3f90b67ec9..f5ff3179bf 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
@@ -307,7 +307,12 @@ public class DefaultLitePullConsumerImpl implements
MQConsumerInner {
log.info("the consumer [{}] start OK",
this.defaultLitePullConsumer.getConsumerGroup());
- operateAfterRunning();
+ try {
+ operateAfterRunning();
+ } catch (Exception e) {
+ shutdown();
+ throw e;
+ }
break;
case RUNNING:
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
index c92cadf505..4eccba8e8d 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
@@ -1006,10 +1006,16 @@ public class DefaultMQPushConsumerImpl implements
MQConsumerInner {
break;
}
- this.updateTopicSubscribeInfoWhenSubscriptionChanged();
- this.mQClientFactory.checkClientInBroker();
- if (this.mQClientFactory.sendHeartbeatToAllBrokerWithLock()) {
- this.mQClientFactory.rebalanceImmediately();
+ try {
+ this.updateTopicSubscribeInfoWhenSubscriptionChanged();
+ this.mQClientFactory.checkClientInBroker();
+ if (this.mQClientFactory.sendHeartbeatToAllBrokerWithLock()) {
+ this.mQClientFactory.rebalanceImmediately();
+ }
+ } catch (Exception e) {
+ log.warn("Start the consumer {} fail.",
this.defaultMQPushConsumer.getConsumerGroup(), e);
+ shutdown();
+ throw e;
}
}