This is an automated email from the ASF dual-hosted git repository.
lollipop pushed a commit to branch 4.9.x
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/4.9.x by this push:
new 18ff529c08 remove lock mq step in broadcasting mode rebalancing (#8773)
18ff529c08 is described below
commit 18ff529c089b67fa3f67d4721dca64b84a485ec8
Author: Liu Shengzhong <[email protected]>
AuthorDate: Tue May 13 14:08:49 2025 +0800
remove lock mq step in broadcasting mode rebalancing (#8773)
remove lock MQ step in broadcasting mode rebalancing (#8773)
---
.../org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
diff --git
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
index 7acc25cc8f..1057e83dff 100644
---
a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
+++
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
@@ -241,7 +241,7 @@ public abstract class RebalanceImpl {
case BROADCASTING: {
Set<MessageQueue> mqSet =
this.topicSubscribeInfoTable.get(topic);
if (mqSet != null) {
- boolean changed =
this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
+ boolean changed =
this.updateProcessQueueTableInRebalance(topic, mqSet, false);
if (changed) {
this.messageQueueChanged(topic, mqSet, mqSet);
log.info("messageQueueChanged {} {} {} {}",
@@ -327,7 +327,7 @@ public abstract class RebalanceImpl {
}
private boolean updateProcessQueueTableInRebalance(final String topic,
final Set<MessageQueue> mqSet,
- final boolean isOrder) {
+ final boolean needLockMq) {
boolean changed = false;
Iterator<Entry<MessageQueue, ProcessQueue>> it =
this.processQueueTable.entrySet().iterator();
@@ -367,7 +367,7 @@ public abstract class RebalanceImpl {
List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
for (MessageQueue mq : mqSet) {
if (!this.processQueueTable.containsKey(mq)) {
- if (isOrder && !this.lock(mq)) {
+ if (needLockMq && !this.lock(mq)) {
log.warn("doRebalance, {}, add a new mq failed, {},
because lock failed", consumerGroup, mq);
continue;
}