This is an automated email from the ASF dual-hosted git repository.

lollipop pushed a commit to branch 4.9.x
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/4.9.x by this push:
     new 18ff529c08 remove lock mq step in broadcasting mode rebalancing (#8773)
18ff529c08 is described below

commit 18ff529c089b67fa3f67d4721dca64b84a485ec8
Author: Liu Shengzhong <[email protected]>
AuthorDate: Tue May 13 14:08:49 2025 +0800

    remove lock mq step in broadcasting mode rebalancing (#8773)
    
    remove lock MQ step in broadcasting mode rebalancing (#8773)
---
 .../org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java     | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
index 7acc25cc8f..1057e83dff 100644
--- 
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
+++ 
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
@@ -241,7 +241,7 @@ public abstract class RebalanceImpl {
             case BROADCASTING: {
                 Set<MessageQueue> mqSet = 
this.topicSubscribeInfoTable.get(topic);
                 if (mqSet != null) {
-                    boolean changed = 
this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
+                    boolean changed = 
this.updateProcessQueueTableInRebalance(topic, mqSet, false);
                     if (changed) {
                         this.messageQueueChanged(topic, mqSet, mqSet);
                         log.info("messageQueueChanged {} {} {} {}",
@@ -327,7 +327,7 @@ public abstract class RebalanceImpl {
     }
 
     private boolean updateProcessQueueTableInRebalance(final String topic, 
final Set<MessageQueue> mqSet,
-        final boolean isOrder) {
+        final boolean needLockMq) {
         boolean changed = false;
 
         Iterator<Entry<MessageQueue, ProcessQueue>> it = 
this.processQueueTable.entrySet().iterator();
@@ -367,7 +367,7 @@ public abstract class RebalanceImpl {
         List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
         for (MessageQueue mq : mqSet) {
             if (!this.processQueueTable.containsKey(mq)) {
-                if (isOrder && !this.lock(mq)) {
+                if (needLockMq && !this.lock(mq)) {
                     log.warn("doRebalance, {}, add a new mq failed, {}, 
because lock failed", consumerGroup, mq);
                     continue;
                 }

Reply via email to