This is an automated email from the ASF dual-hosted git repository.
RongtongJin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 1730dbc040 [ISSUE #10560] Remove enableLiteEventMode config switch
(#10561)
1730dbc040 is described below
commit 1730dbc040b04a26dca5128490e37796aac9b007
Author: Quan <[email protected]>
AuthorDate: Wed Jul 1 13:50:53 2026 +0800
[ISSUE #10560] Remove enableLiteEventMode config switch (#10561)
* [ISSUE #10560] Remove enableLiteEventMode config switch
- Remove enableLiteEventMode field, getter and setter from BrokerConfig
- Remove 5 early-return guard checks in LiteEventDispatcher
- Remove dead condition in PopLiteMessageProcessor.popLiteTopic
- Simplify getEventIterator to always use event-set path
- Delete unused LiteSubscriptionIterator inner class
- Remove disabled-mode test cases and stale Javadoc references
* chore: retrigger CI
---
.../rocketmq/broker/lite/LiteEventDispatcher.java | 52 ++----------------
.../broker/processor/PopLiteMessageProcessor.java | 4 --
.../broker/lite/LiteEventDispatcherTest.java | 61 ----------------------
.../processor/PopLiteMessageProcessorTest.java | 13 -----
.../org/apache/rocketmq/common/BrokerConfig.java | 8 ---
5 files changed, 3 insertions(+), 135 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/lite/LiteEventDispatcher.java
b/broker/src/main/java/org/apache/rocketmq/broker/lite/LiteEventDispatcher.java
index 1267856da6..2bd1f36186 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/lite/LiteEventDispatcher.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/lite/LiteEventDispatcher.java
@@ -34,7 +34,6 @@ import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
@@ -87,18 +86,13 @@ public class LiteEventDispatcher extends ServiceThread {
}
/**
- * If event mode is enabled, try to dispatch event to one client when
message arriving or available.
+ * Try to dispatch event to one client when message arriving or available.
* In most cases, there is only one subscriber for a LMQ under a consumer
group,
* but also supports multiple clients consuming in share mode.
* When group is null, dispatch to all subscribers regardless of their
group,
* when group is specified, only dispatch to subscribers belonging to this
group.
- * <p>
- * If the expected number of subscriptions by each client is small,
disabling event mode can be a choice.
*/
public void dispatch(String group, String lmqName, int queueId, long
offset, long msgStoreTime) {
- if (!this.brokerController.getBrokerConfig().isEnableLiteEventMode()) {
- return;
- }
if (queueId != 0 || !LiteUtil.isLiteTopicQueue(lmqName)) {
return;
}
@@ -106,9 +100,6 @@ public class LiteEventDispatcher extends ServiceThread {
}
protected void doDispatch(String group, String lmqName, String
excludeClientId) {
- if (!this.brokerController.getBrokerConfig().isEnableLiteEventMode()) {
- return;
- }
SubscriberWrapper wrapper =
liteSubscriptionRegistry.getAllSubscriber(group, lmqName);
if (null == wrapper) {
return;
@@ -134,9 +125,6 @@ public class LiteEventDispatcher extends ServiceThread {
*/
@VisibleForTesting
public boolean selectAndDispatch(String lmqName, List<ClientGroup>
clients, String excludeClientId) {
- if (!this.brokerController.getBrokerConfig().isEnableLiteEventMode()) {
- return true;
- }
if (CollectionUtils.isEmpty(clients)) {
return true;
}
@@ -203,18 +191,10 @@ public class LiteEventDispatcher extends ServiceThread {
/**
* Get an iterator for iterating over events for a specific client.
- * In lite event mode, returns events from the client's event queue,
- * or else returns topics from the client's subscription.
+ * Returns events from the client's event queue.
*/
public Iterator<String> getEventIterator(String clientId) {
- if (this.brokerController.getBrokerConfig().isEnableLiteEventMode()) {
- return new EventSetIterator(clientEventMap.get(clientId));
- } else {
- LiteSubscription liteSubscription =
liteSubscriptionRegistry.getLiteSubscription(clientId);
- return liteSubscription != null &&
liteSubscription.getLiteTopicSet() != null ?
- new LiteSubscriptionIterator(liteSubscription.getTopic(),
liteSubscription.getLiteTopicSet().iterator())
- : Collections.emptyIterator();
- }
+ return new EventSetIterator(clientEventMap.get(clientId));
}
/**
@@ -224,9 +204,6 @@ public class LiteEventDispatcher extends ServiceThread {
* with available messages.
*/
public void doFullDispatchForClient(String clientId, String group) {
- if (!this.brokerController.getBrokerConfig().isEnableLiteEventMode()) {
- return;
- }
LiteSubscription subscription =
liteSubscriptionRegistry.getLiteSubscription(clientId);
if (null == subscription ||
CollectionUtils.isEmpty(subscription.getLiteTopicSet())) {
LOGGER.info("client full dispatch, but no subscription. {}",
clientId);
@@ -279,9 +256,6 @@ public class LiteEventDispatcher extends ServiceThread {
* It iterates through all LMQ topics in CQ table, so it may be a heavy
work.
*/
public void doFullDispatchForWildcardGroup(String group) {
- if (!this.brokerController.getBrokerConfig().isEnableLiteEventMode()) {
- return;
- }
String parentTopic = LiteMetadataUtil.getLiteBindTopic(group,
brokerController);
if (null == parentTopic || !LiteMetadataUtil.isWildcardGroup(group,
brokerController)) {
return;
@@ -573,26 +547,6 @@ public class LiteEventDispatcher extends ServiceThread {
}
}
- static class LiteSubscriptionIterator implements Iterator<String> {
- private final Iterator<String> iterator;
- private final String parentTopic;
-
- public LiteSubscriptionIterator(String parentTopic, Iterator<String>
iterator) {
- this.parentTopic = parentTopic;
- this.iterator = iterator;
- }
-
- @Override
- public boolean hasNext() {
- return iterator.hasNext();
- }
-
- @Override
- public String next() {
- return iterator.next();
- }
- }
-
protected static class FullDispatchRequest {
private final String clientId;
private final String group;
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopLiteMessageProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopLiteMessageProcessor.java
index 4da72ef4bd..167597dab3 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopLiteMessageProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopLiteMessageProcessor.java
@@ -287,10 +287,6 @@ public class PopLiteMessageProcessor implements
NettyRequestProcessor {
@VisibleForTesting
public Pair<StringBuilder, GetMessageResult> popLiteTopic(String
parentTopic, String clientHost, String group,
String lmqName, long maxNum, long popTime, long invisibleTime, String
attemptId) {
- if (!brokerController.getBrokerConfig().isEnableLiteEventMode()
- &&
!brokerController.getLiteLifecycleManager().isLmqExist(lmqName)) {
- return null;
- }
String lockKey = KeyBuilder.buildPopLiteLockKey(group, lmqName);
if (!lockService.tryLock(lockKey)) {
return null;
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/lite/LiteEventDispatcherTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/lite/LiteEventDispatcherTest.java
index 36e4ae2378..84aec24829 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/lite/LiteEventDispatcherTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/lite/LiteEventDispatcherTest.java
@@ -104,23 +104,14 @@ public class LiteEventDispatcherTest {
verify(liteSubscriptionRegistry).addListener(any(LiteEventDispatcher.LiteCtlListenerImpl.class));
}
- @Test
- public void testDispatchWhenEventModeDisabled() {
- brokerConfig.setEnableLiteEventMode(false);
- liteEventDispatcher.dispatch("group", "lmqName", 0, 0L, 0L);
- verify(liteSubscriptionRegistry,
never()).getAllSubscriber(anyString(), anyString());
- }
-
@Test
public void testDispatchWhenQueueIdNotZero() {
- brokerConfig.setEnableLiteEventMode(true);
liteEventDispatcher.dispatch("group", "lmqName", 1, 0L, 0L);
verify(liteSubscriptionRegistry,
never()).getAllSubscriber(anyString(), anyString());
}
@Test
public void testDispatchCallsDoDispatch() {
- brokerConfig.setEnableLiteEventMode(true);
String lmqName = LiteUtil.toLmqName("parentTopic", "lmqName");
LiteEventDispatcher spyDispatcher = Mockito.spy(liteEventDispatcher);
spyDispatcher.dispatch("group", lmqName, 0, 0L, 0L);
@@ -129,7 +120,6 @@ public class LiteEventDispatcherTest {
@Test
public void testDoDispatchWhenWrapperIsNull() {
- brokerConfig.setEnableLiteEventMode(true);
when(liteSubscriptionRegistry.getAllSubscriber("group",
"lmqName")).thenReturn(null);
// Use reflection to access private method
@@ -147,8 +137,6 @@ public class LiteEventDispatcherTest {
@Test
public void testDoDispatchWithListWrapper() {
- brokerConfig.setEnableLiteEventMode(true);
-
SubscriptionGroupConfig subscriptionGroupConfig = new
SubscriptionGroupConfig();
subscriptionGroupConfig.setWildcardLiteGroup(false);
when(subscriptionGroupManager.findSubscriptionGroupConfig("group")).thenReturn(subscriptionGroupConfig);
@@ -166,8 +154,6 @@ public class LiteEventDispatcherTest {
@Test
public void testDoDispatchWithMapWrapper() {
- brokerConfig.setEnableLiteEventMode(true);
-
SubscriberWrapper.MapWrapper mapWrapper =
mock(SubscriberWrapper.MapWrapper.class);
Map<String, List<ClientGroup>> groupMap = new HashMap<>();
groupMap.put("key", Collections.singletonList(new
ClientGroup("clientId", "group")));
@@ -189,17 +175,8 @@ public class LiteEventDispatcherTest {
assertTrue(result);
}
- @Test
- public void testSelectAndDispatchWhenEventModeDisabled() {
- brokerConfig.setEnableLiteEventMode(false);
- List<ClientGroup> clients = Collections.singletonList(new
ClientGroup("clientId", "group"));
- boolean result = liteEventDispatcher.selectAndDispatch("lmqName",
clients, null);
- assertTrue(result);
- }
-
@Test
public void testSelectAndDispatchSelectsClientAndDispatches() {
- brokerConfig.setEnableLiteEventMode(true);
List<ClientGroup> clients = Collections.singletonList(new
ClientGroup("clientId", "group"));
LiteEventDispatcher spyDispatcher = Mockito.spy(liteEventDispatcher);
@@ -212,7 +189,6 @@ public class LiteEventDispatcherTest {
@Test
public void testSelectAndDispatchExcludesSpecifiedClient() {
- brokerConfig.setEnableLiteEventMode(true);
List<ClientGroup> clients = Arrays.asList(
new ClientGroup("excludeId", "group"),
new ClientGroup("clientId", "group")
@@ -263,7 +239,6 @@ public class LiteEventDispatcherTest {
@Test
public void testGetEventIteratorInEventMode() {
- brokerConfig.setEnableLiteEventMode(true);
String clientId = "clientId";
String group = "group";
@@ -275,25 +250,8 @@ public class LiteEventDispatcherTest {
assertFalse(iterator.hasNext());
}
- @Test
- public void testGetEventIteratorWhenNotInEventMode() {
- brokerConfig.setEnableLiteEventMode(false);
- String clientId = "clientId";
- LiteSubscription subscription = mock(LiteSubscription.class);
- Set<String> topicSet = new HashSet<>();
- topicSet.add("topic1");
- when(subscription.getLiteTopicSet()).thenReturn(topicSet);
-
when(liteSubscriptionRegistry.getLiteSubscription(clientId)).thenReturn(subscription);
-
- Iterator<String> iterator =
liteEventDispatcher.getEventIterator(clientId);
- assertNotNull(iterator);
- assertTrue(iterator.hasNext());
- assertEquals("topic1", iterator.next());
- }
-
@Test
public void testDoFullDispatchForClientWhenSubscriptionIsNull() {
- brokerConfig.setEnableLiteEventMode(true);
String clientId = "clientId";
String group = "group";
@@ -305,7 +263,6 @@ public class LiteEventDispatcherTest {
@Test
public void testDoFullDispatchForClientWhenSubscriptionHasNoTopics() {
- brokerConfig.setEnableLiteEventMode(true);
String clientId = "clientId";
String group = "group";
@@ -456,23 +413,6 @@ public class LiteEventDispatcherTest {
assertFalse(iterator.hasNext());
}
- @Test
- public void testLiteSubscriptionIteratorHasNextAndNext() {
- Set<String> topics = new HashSet<>();
- topics.add("topic1");
- topics.add("topic2");
- Iterator<String> topicIterator = topics.iterator();
-
- LiteEventDispatcher.LiteSubscriptionIterator iterator =
- new LiteEventDispatcher.LiteSubscriptionIterator("parentTopic",
topicIterator);
-
- assertTrue(iterator.hasNext());
- assertNotNull(iterator.next());
- assertTrue(iterator.hasNext());
- assertNotNull(iterator.next());
- assertFalse(iterator.hasNext());
- }
-
@Test
public void testComparatorComparesTimestampsCorrectly() {
String clientId1 = "clientId1";
@@ -568,7 +508,6 @@ public class LiteEventDispatcherTest {
String clientId = "testClientId";
String group = "testGroup";
String lmqName = "testLmq";
- brokerConfig.setEnableLiteEventMode(true);
LiteSubscription subscription = new LiteSubscription();
Set<String> topics = new HashSet<>();
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopLiteMessageProcessorTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopLiteMessageProcessorTest.java
index 957386b166..ee6d6101b1 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopLiteMessageProcessorTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopLiteMessageProcessorTest.java
@@ -112,7 +112,6 @@ public class PopLiteMessageProcessorTest {
when(brokerController.getMessageStore()).thenReturn(messageStore);
when(brokerController.getTopicConfigManager()).thenReturn(topicConfigManager);
when(brokerController.getSubscriptionGroupManager()).thenReturn(subscriptionGroupManager);
-
when(brokerController.getLiteLifecycleManager()).thenReturn(liteLifecycleManager);
when(brokerController.getLiteSubscriptionRegistry()).thenReturn(liteSubscriptionRegistry);
PopLiteMessageProcessor testObject = new
PopLiteMessageProcessor(brokerController, liteEventDispatcher);
@@ -383,18 +382,6 @@ public class PopLiteMessageProcessorTest {
verify(lockService).unlock(anyString());
}
- @Test
- public void testPopLiteTopic_lmqNotExist() {
- when(liteLifecycleManager.isLmqExist("lmqName")).thenReturn(false);
- brokerConfig.setEnableLiteEventMode(false);
-
- Pair<StringBuilder, GetMessageResult> result =
popLiteMessageProcessor.popLiteTopic("parentTopic",
- "clientHost", "group", "lmqName", 32L, System.currentTimeMillis(),
6000L, "attemptId");
-
- assertThat(result).isNull();
- verify(lockService, never()).tryLock(anyString());
- }
-
@Test
public void testPopLiteTopic_found() {
when(lockService.tryLock(anyString())).thenReturn(true);
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index 38644659e1..3c0ba3f6ca 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -537,7 +537,6 @@ public class BrokerConfig extends BrokerIdentity {
private boolean enableCreateSysGroup = true;
- private boolean enableLiteEventMode = true;
private long liteEventCheckInterval = 10 * 1000;
@@ -2367,13 +2366,6 @@ public class BrokerConfig extends BrokerIdentity {
this.useSeparateRetryQueue = useSeparateRetryQueue;
}
- public boolean isEnableLiteEventMode() {
- return enableLiteEventMode;
- }
-
- public void setEnableLiteEventMode(boolean enableLiteEventMode) {
- this.enableLiteEventMode = enableLiteEventMode;
- }
public long getLiteEventCheckInterval() {
return liteEventCheckInterval;