This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 71ec1eda49 [ISSUE #8483] Optimize unnecessary broker reverse
notification (notifyConsumerIdsChanged) in broadcast mode (#8484)
71ec1eda49 is described below
commit 71ec1eda49277a47815159cd3d118854c8dcb3c4
Author: yx9o <[email protected]>
AuthorDate: Fri Aug 30 14:20:20 2024 +0800
[ISSUE #8483] Optimize unnecessary broker reverse notification
(notifyConsumerIdsChanged) in broadcast mode (#8484)
* [ISSUE #8483] Optimize unnecessary broker reverse notification
(notifyConsumerIdsChanged) in broadcast mode
* Update
* Update test
* Update test
---
.../rocketmq/broker/client/ConsumerManager.java | 15 ++--
.../broker/client/ConsumerManagerTest.java | 93 +++++++++++-----------
2 files changed, 57 insertions(+), 51 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java
index 9f838b5154..b1057e2a8d 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java
@@ -145,8 +145,9 @@ public class ConsumerManager {
callConsumerIdsChangeListener(ConsumerGroupEvent.UNREGISTER, next.getKey());
}
}
-
- callConsumerIdsChangeListener(ConsumerGroupEvent.CHANGE,
next.getKey(), info.getAllChannel());
+ if (!isBroadcastMode(info.getMessageModel())) {
+ callConsumerIdsChangeListener(ConsumerGroupEvent.CHANGE,
next.getKey(), info.getAllChannel());
+ }
}
}
return removed;
@@ -196,7 +197,7 @@ public class ConsumerManager {
}
if (r1 || r2) {
- if (isNotifyConsumerIdsChangedEnable) {
+ if (isNotifyConsumerIdsChangedEnable &&
!isBroadcastMode(consumerGroupInfo.getMessageModel())) {
callConsumerIdsChangeListener(ConsumerGroupEvent.CHANGE,
group, consumerGroupInfo.getAllChannel());
}
}
@@ -219,7 +220,7 @@ public class ConsumerManager {
consumerGroupInfo = prev != null ? prev : tmp;
}
boolean updateChannelRst =
consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel,
consumeFromWhere);
- if (updateChannelRst && isNotifyConsumerIdsChangedEnable) {
+ if (updateChannelRst && isNotifyConsumerIdsChangedEnable &&
!isBroadcastMode(consumerGroupInfo.getMessageModel())) {
callConsumerIdsChangeListener(ConsumerGroupEvent.CHANGE, group,
consumerGroupInfo.getAllChannel());
}
if (null != this.brokerStatsManager) {
@@ -244,7 +245,7 @@ public class ConsumerManager {
callConsumerIdsChangeListener(ConsumerGroupEvent.UNREGISTER, group);
}
}
- if (isNotifyConsumerIdsChangedEnable) {
+ if (isNotifyConsumerIdsChangedEnable &&
!isBroadcastMode(consumerGroupInfo.getMessageModel())) {
callConsumerIdsChangeListener(ConsumerGroupEvent.CHANGE,
group, consumerGroupInfo.getAllChannel());
}
}
@@ -334,4 +335,8 @@ public class ConsumerManager {
}
}
}
+
+ private boolean isBroadcastMode(final MessageModel messageModel) {
+ return MessageModel.BROADCASTING.equals(messageModel);
+ }
}
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/client/ConsumerManagerTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/client/ConsumerManagerTest.java
index 8c90982434..a23ad20037 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/client/ConsumerManagerTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/client/ConsumerManagerTest.java
@@ -18,10 +18,7 @@
package org.apache.rocketmq.broker.client;
import io.netty.channel.Channel;
-import java.util.HashSet;
-import java.util.Set;
import org.apache.rocketmq.broker.BrokerController;
-import org.apache.rocketmq.broker.client.net.Broker2Client;
import org.apache.rocketmq.broker.filter.ConsumerFilterManager;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
@@ -30,13 +27,25 @@ import
org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
-import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
@@ -49,19 +58,10 @@ public class ConsumerManagerTest {
private ConsumerManager consumerManager;
- private DefaultConsumerIdsChangeListener defaultConsumerIdsChangeListener;
-
@Mock
private BrokerController brokerController;
- @Mock
- private ConsumerFilterManager consumerFilterManager;
-
- private BrokerConfig brokerConfig = new BrokerConfig();
-
- private Broker2Client broker2Client;
-
- private BrokerStatsManager brokerStatsManager;
+ private final BrokerConfig brokerConfig = new BrokerConfig();
private static final String GROUP = "DEFAULT_GROUP";
@@ -74,40 +74,38 @@ public class ConsumerManagerTest {
@Before
public void before() {
clientChannelInfo = new ClientChannelInfo(channel, CLIENT_ID,
LanguageCode.JAVA, VERSION);
- defaultConsumerIdsChangeListener = new
DefaultConsumerIdsChangeListener(brokerController);
- brokerStatsManager = new BrokerStatsManager(brokerConfig);
- consumerManager = new
ConsumerManager(defaultConsumerIdsChangeListener, brokerStatsManager,
brokerConfig);
- broker2Client = new Broker2Client(brokerController);
+ DefaultConsumerIdsChangeListener defaultConsumerIdsChangeListener =
new DefaultConsumerIdsChangeListener(brokerController);
+ BrokerStatsManager brokerStatsManager = new
BrokerStatsManager(brokerConfig);
+ consumerManager = spy(new
ConsumerManager(defaultConsumerIdsChangeListener, brokerStatsManager,
brokerConfig));
+ ConsumerFilterManager consumerFilterManager =
mock(ConsumerFilterManager.class);
when(brokerController.getConsumerFilterManager()).thenReturn(consumerFilterManager);
- when(brokerController.getBrokerConfig()).thenReturn(brokerConfig);
- when(brokerController.getBroker2Client()).thenReturn(broker2Client);
}
@Test
public void compensateBasicConsumerInfoTest() {
ConsumerGroupInfo consumerGroupInfo =
consumerManager.getConsumerGroupInfo(GROUP, true);
- Assertions.assertThat(consumerGroupInfo).isNull();
+ assertThat(consumerGroupInfo).isNull();
consumerManager.compensateBasicConsumerInfo(GROUP,
ConsumeType.CONSUME_ACTIVELY, MessageModel.BROADCASTING);
consumerGroupInfo = consumerManager.getConsumerGroupInfo(GROUP, true);
- Assertions.assertThat(consumerGroupInfo).isNotNull();
-
Assertions.assertThat(consumerGroupInfo.getConsumeType()).isEqualTo(ConsumeType.CONSUME_ACTIVELY);
-
Assertions.assertThat(consumerGroupInfo.getMessageModel()).isEqualTo(MessageModel.BROADCASTING);
+ assertThat(consumerGroupInfo).isNotNull();
+
assertThat(consumerGroupInfo.getConsumeType()).isEqualTo(ConsumeType.CONSUME_ACTIVELY);
+
assertThat(consumerGroupInfo.getMessageModel()).isEqualTo(MessageModel.BROADCASTING);
}
@Test
public void compensateSubscribeDataTest() {
ConsumerGroupInfo consumerGroupInfo =
consumerManager.getConsumerGroupInfo(GROUP, true);
- Assertions.assertThat(consumerGroupInfo).isNull();
+ assertThat(consumerGroupInfo).isNull();
consumerManager.compensateSubscribeData(GROUP, TOPIC, new
SubscriptionData(TOPIC, SubscriptionData.SUB_ALL));
consumerGroupInfo = consumerManager.getConsumerGroupInfo(GROUP, true);
- Assertions.assertThat(consumerGroupInfo).isNotNull();
-
Assertions.assertThat(consumerGroupInfo.getSubscriptionTable().size()).isEqualTo(1);
+ assertThat(consumerGroupInfo).isNotNull();
+
assertThat(consumerGroupInfo.getSubscriptionTable().size()).isEqualTo(1);
SubscriptionData subscriptionData =
consumerGroupInfo.getSubscriptionTable().get(TOPIC);
- Assertions.assertThat(subscriptionData).isNotNull();
- Assertions.assertThat(subscriptionData.getTopic()).isEqualTo(TOPIC);
-
Assertions.assertThat(subscriptionData.getSubString()).isEqualTo(SubscriptionData.SUB_ALL);
+ assertThat(subscriptionData).isNotNull();
+ assertThat(subscriptionData.getTopic()).isEqualTo(TOPIC);
+
assertThat(subscriptionData.getSubString()).isEqualTo(SubscriptionData.SUB_ALL);
}
@Test
@@ -118,7 +116,8 @@ public class ConsumerManagerTest {
subList.add(subscriptionData);
consumerManager.registerConsumer(GROUP, clientChannelInfo,
ConsumeType.CONSUME_PASSIVELY,
MessageModel.BROADCASTING,
ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET, subList, true);
-
Assertions.assertThat(consumerManager.getConsumerTable().get(GROUP)).isNotNull();
+ verify(consumerManager,
never()).callConsumerIdsChangeListener(eq(ConsumerGroupEvent.CHANGE), any(),
any());
+ assertThat(consumerManager.getConsumerTable().get(GROUP)).isNotNull();
}
@Test
@@ -128,63 +127,65 @@ public class ConsumerManagerTest {
// unregister
consumerManager.unregisterConsumer(GROUP, clientChannelInfo, true);
-
Assertions.assertThat(consumerManager.getConsumerTable().get(GROUP)).isNull();
+ verify(consumerManager,
never()).callConsumerIdsChangeListener(eq(ConsumerGroupEvent.CHANGE), any(),
any());
+ assertThat(consumerManager.getConsumerTable().get(GROUP)).isNull();
}
@Test
public void findChannelTest() {
register();
final ClientChannelInfo consumerManagerChannel =
consumerManager.findChannel(GROUP, CLIENT_ID);
- Assertions.assertThat(consumerManagerChannel).isNotNull();
+ assertThat(consumerManagerChannel).isNotNull();
}
@Test
public void findSubscriptionDataTest() {
register();
final SubscriptionData subscriptionData =
consumerManager.findSubscriptionData(GROUP, TOPIC);
- Assertions.assertThat(subscriptionData).isNotNull();
+ assertThat(subscriptionData).isNotNull();
}
@Test
public void findSubscriptionDataCountTest() {
register();
final int count = consumerManager.findSubscriptionDataCount(GROUP);
- assert count > 0;
+ assertTrue(count > 0);
}
@Test
public void findSubscriptionTest() {
SubscriptionData subscriptionData =
consumerManager.findSubscriptionData(GROUP, TOPIC, true);
- Assertions.assertThat(subscriptionData).isNull();
+ assertThat(subscriptionData).isNull();
consumerManager.compensateSubscribeData(GROUP, TOPIC, new
SubscriptionData(TOPIC, SubscriptionData.SUB_ALL));
subscriptionData = consumerManager.findSubscriptionData(GROUP, TOPIC,
true);
- Assertions.assertThat(subscriptionData).isNotNull();
- Assertions.assertThat(subscriptionData.getTopic()).isEqualTo(TOPIC);
-
Assertions.assertThat(subscriptionData.getSubString()).isEqualTo(SubscriptionData.SUB_ALL);
+ assertThat(subscriptionData).isNotNull();
+ assertThat(subscriptionData.getTopic()).isEqualTo(TOPIC);
+
assertThat(subscriptionData.getSubString()).isEqualTo(SubscriptionData.SUB_ALL);
subscriptionData = consumerManager.findSubscriptionData(GROUP, TOPIC,
false);
- Assertions.assertThat(subscriptionData).isNull();
+ assertThat(subscriptionData).isNull();
}
@Test
public void scanNotActiveChannelTest() {
clientChannelInfo.setLastUpdateTimestamp(System.currentTimeMillis() -
brokerConfig.getChannelExpiredTimeout() * 2);
consumerManager.scanNotActiveChannel();
-
Assertions.assertThat(consumerManager.getConsumerTable().size()).isEqualTo(0);
+ assertThat(consumerManager.getConsumerTable().size()).isEqualTo(0);
}
@Test
public void queryTopicConsumeByWhoTest() {
register();
final HashSet<String> consumeGroup =
consumerManager.queryTopicConsumeByWho(TOPIC);
- assert consumeGroup.size() > 0;
+ assertFalse(consumeGroup.isEmpty());
}
@Test
public void doChannelCloseEventTest() {
consumerManager.doChannelCloseEvent("127.0.0.1", channel);
- assert consumerManager.getConsumerTable().size() == 0;
+ verify(consumerManager,
never()).callConsumerIdsChangeListener(eq(ConsumerGroupEvent.CHANGE), any(),
any());
+ assertEquals(0, consumerManager.getConsumerTable().size());
}
private void register() {
@@ -203,8 +204,8 @@ public class ConsumerManagerTest {
consumerManager.compensateSubscribeData(GROUP, TOPIC,
subscriptionData);
consumerManager.compensateSubscribeData(GROUP, TOPIC + "_1", new
SubscriptionData(TOPIC, SubscriptionData.SUB_ALL));
consumerManager.removeExpireConsumerGroupInfo();
- Assertions.assertThat(consumerManager.getConsumerGroupInfo(GROUP,
true)).isNotNull();
- Assertions.assertThat(consumerManager.findSubscriptionData(GROUP,
TOPIC)).isNull();
- Assertions.assertThat(consumerManager.findSubscriptionData(GROUP,
TOPIC + "_1")).isNotNull();
+ assertThat(consumerManager.getConsumerGroupInfo(GROUP,
true)).isNotNull();
+ assertThat(consumerManager.findSubscriptionData(GROUP,
TOPIC)).isNull();
+ assertThat(consumerManager.findSubscriptionData(GROUP, TOPIC +
"_1")).isNotNull();
}
}