This is an automated email from the ASF dual-hosted git repository.
lizhimins pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 1ab9689f34 [ISSUE #10450] Selective double-write in
CombineConsumeQueueStore (#10452)
1ab9689f34 is described below
commit 1ab9689f34db0b14c8f4c548cc5be5b52c83fc29
Author: imzs <[email protected]>
AuthorDate: Thu Jun 11 21:14:18 2026 +0800
[ISSUE #10450] Selective double-write in CombineConsumeQueueStore (#10452)
---
.../broker/processor/LiteManagerProcessor.java | 3 +-
.../rocketmq/store/config/MessageStoreConfig.java | 10 +
.../store/queue/CombineConsumeQueueStore.java | 38 ++++
.../store/queue/CombineConsumeQueueStoreTest.java | 211 ++++++++++++++-------
4 files changed, 194 insertions(+), 68 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/LiteManagerProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/LiteManagerProcessor.java
index 7d24d7f95d..63037b9fb7 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/LiteManagerProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/LiteManagerProcessor.java
@@ -109,7 +109,8 @@ public class LiteManagerProcessor implements
NettyRequestProcessor {
final RemotingCommand response =
RemotingCommand.createResponseCommand(null);
GetBrokerLiteInfoResponseBody body = new
GetBrokerLiteInfoResponseBody();
-
body.setStoreType(brokerController.getMessageStoreConfig().getStoreType());
+
body.setStoreType(brokerController.getMessageStoreConfig().isRocksdbCQDoubleWriteEnable()
?
+ "Combine" :
brokerController.getMessageStoreConfig().getStoreType());
body.setMaxLmqNum(brokerController.getMessageStoreConfig().getMaxLmqConsumeQueueNum());
body.setCurrentLmqNum(brokerController.getMessageStore().getQueueStore().getLmqNum());
body.setLiteSubscriptionCount(brokerController.getLiteSubscriptionRegistry().getActiveSubscriptionNum());
diff --git
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
index fadc957e9d..8cb3b1c908 100644
---
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
+++
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
@@ -473,6 +473,9 @@ public class MessageStoreConfig {
private boolean rocksdbCQDoubleWriteEnable = false;
+ // Secondary switch of rocksdbCQDoubleWriteEnable. In
CombineConsumeQueueStore, only specific topics will double-write CQ.
+ private boolean rocksdbCQSelectiveDoubleWriteEnable = false;
+
/**
* CombineConsumeQueueStore
* combineCQLoadingCQTypes is used to configure the loading types of CQ.
load / recover / start order: [default -> defaultRocksDB]
@@ -584,6 +587,13 @@ public class MessageStoreConfig {
this.rocksdbCQDoubleWriteEnable = rocksdbWriteEnable;
}
+ public boolean isRocksdbCQSelectiveDoubleWriteEnable() {
+ return rocksdbCQSelectiveDoubleWriteEnable;
+ }
+
+ public void setRocksdbCQSelectiveDoubleWriteEnable(boolean
rocksdbCQSelectiveDoubleWriteEnable) {
+ this.rocksdbCQSelectiveDoubleWriteEnable =
rocksdbCQSelectiveDoubleWriteEnable;
+ }
public boolean isEnabledAppendPropCRC() {
return enabledAppendPropCRC;
diff --git
a/store/src/main/java/org/apache/rocketmq/store/queue/CombineConsumeQueueStore.java
b/store/src/main/java/org/apache/rocketmq/store/queue/CombineConsumeQueueStore.java
index f266f9d57a..b074e7a09c 100644
---
a/store/src/main/java/org/apache/rocketmq/store/queue/CombineConsumeQueueStore.java
+++
b/store/src/main/java/org/apache/rocketmq/store/queue/CombineConsumeQueueStore.java
@@ -22,6 +22,7 @@ import com.google.common.annotations.VisibleForTesting;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
@@ -29,6 +30,8 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.BoundaryType;
import org.apache.rocketmq.common.CheckRocksdbCqWriteResult;
import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.attribute.TopicMessageType;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.common.Pair;
@@ -125,6 +128,14 @@ public class CombineConsumeQueueStore implements
ConsumeQueueStoreInterface {
throw new IllegalArgumentException("CombineConsumeQueueStore maybe
incorrect config");
}
+ if (messageStoreConfig.isRocksdbCQSelectiveDoubleWriteEnable() &&
assignOffsetStore != consumeQueueStore) {
+ throw new IllegalArgumentException("CombineConsumeQueueStore maybe
incorrect config");
+ }
+
+ if (messageStoreConfig.isRocksdbCQSelectiveDoubleWriteEnable() &&
currentReadStore != consumeQueueStore) {
+ throw new IllegalArgumentException("CombineConsumeQueueStore maybe
incorrect config");
+ }
+
log.info("CombineConsumeQueueStore init, consumeQueueStoreList={},
currentReadStore={}, assignOffsetStore={}, combineCQUseRocksdbForLmq={}",
innerConsumeQueueStoreList,
currentReadStore.getClass().getSimpleName(),
assignOffsetStore.getClass().getSimpleName(),
messageStoreConfig.isCombineCQUseRocksdbForLmq());
@@ -248,6 +259,10 @@ public class CombineConsumeQueueStore implements
ConsumeQueueStoreInterface {
continue;
}
+ if (abstractConsumeQueueStore == rocksDBConsumeQueueStore
&& !shouldDoubleWriteForTopic(topic)) {
+ continue;
+ }
+
ConsumeQueueInterface queue =
abstractConsumeQueueStore.findOrCreateConsumeQueue(topic, queueId);
long maxOffset0 = queue.getMaxOffsetInQueue();
@@ -347,6 +362,10 @@ public class CombineConsumeQueueStore implements
ConsumeQueueStoreInterface {
@Override
public void putMessagePositionInfoWrapper(DispatchRequest request) throws
RocksDBException {
for (AbstractConsumeQueueStore store : innerConsumeQueueStoreList) {
+ // Not all topics are double-written
+ if (store == rocksDBConsumeQueueStore &&
!shouldDoubleWriteForTopic(request.getTopic())) {
+ continue;
+ }
store.putMessagePositionInfoWrapper(request);
}
}
@@ -500,6 +519,11 @@ public class CombineConsumeQueueStore implements
ConsumeQueueStoreInterface {
private boolean processConsumeQueuesForTopic(ConcurrentMap<Integer,
ConsumeQueueInterface> queueMap, String topic,
AbstractConsumeQueueStore abstractConsumeQueueStore, StringBuilder
diffResult, boolean printDetail,
long checkpointByStoreTime) {
+
+ if (abstractConsumeQueueStore == rocksDBConsumeQueueStore &&
!shouldDoubleWriteForTopic(topic)) {
+ return true;
+ }
+
boolean processResult = true;
for (Map.Entry<Integer, ConsumeQueueInterface> queueEntry :
queueMap.entrySet()) {
Integer queueId = queueEntry.getKey();
@@ -601,4 +625,18 @@ public class CombineConsumeQueueStore implements
ConsumeQueueStoreInterface {
}
return currentReadStore;
}
+
+ /**
+ * Determines whether RocksDB CQ should be written for the given topic
under selective double-write mode.
+ * The file-based ConsumeQueueStore is treated as the base store.
+ * @param topic the topic name
+ * @return true if RocksDB CQ should be written for this topic
+ */
+ protected boolean shouldDoubleWriteForTopic(String topic) {
+ if (!messageStoreConfig.isRocksdbCQSelectiveDoubleWriteEnable()) {
+ return true;
+ }
+ Optional<TopicConfig> tc = messageStore.getTopicConfig(topic);
+ return tc.isPresent() && tc.get().getTopicMessageType() ==
TopicMessageType.LITE;
+ }
}
diff --git
a/store/src/test/java/org/apache/rocketmq/store/queue/CombineConsumeQueueStoreTest.java
b/store/src/test/java/org/apache/rocketmq/store/queue/CombineConsumeQueueStoreTest.java
index e7ac763a18..03186e2729 100644
---
a/store/src/test/java/org/apache/rocketmq/store/queue/CombineConsumeQueueStoreTest.java
+++
b/store/src/test/java/org/apache/rocketmq/store/queue/CombineConsumeQueueStoreTest.java
@@ -30,6 +30,7 @@ import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.attribute.CQType;
+import org.apache.rocketmq.common.attribute.TopicMessageType;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExtBrokerInner;
@@ -39,7 +40,6 @@ import org.apache.rocketmq.store.DispatchRequest;
import org.apache.rocketmq.store.StoreType;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.junit.After;
-import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -47,9 +47,11 @@ import org.mockito.junit.MockitoJUnitRunner;
import static org.apache.rocketmq.common.TopicFilterType.SINGLE_TAG;
import static org.awaitility.Awaitility.await;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
@RunWith(MockitoJUnitRunner.class)
public class CombineConsumeQueueStoreTest extends QueueTestBase {
@@ -105,6 +107,24 @@ public class CombineConsumeQueueStoreTest extends
QueueTestBase {
new CombineConsumeQueueStore(messageStore);
}
+ @Test(expected = IllegalArgumentException.class)
+ public void
CombineConsumeQueueStore_selectiveDoubleWrite_assignOffsetStoreIsRocksdb_ThrowsException()
throws Exception {
+ messageStore = (DefaultMessageStore) createMessageStore(null, false,
topicConfigTableMap, messageStoreConfig);
+
+ messageStoreConfig.setRocksdbCQSelectiveDoubleWriteEnable(true);
+
messageStoreConfig.setCombineAssignOffsetCQType(StoreType.DEFAULT_ROCKSDB.getStoreType());
+ new CombineConsumeQueueStore(messageStore);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void
CombineConsumeQueueStore_selectiveDoubleWrite_readStoreIsRocksdb_ThrowsException()
throws Exception {
+ messageStore = (DefaultMessageStore) createMessageStore(null, false,
topicConfigTableMap, messageStoreConfig);
+
+ messageStoreConfig.setRocksdbCQSelectiveDoubleWriteEnable(true);
+
messageStoreConfig.setCombineCQPreferCQType(StoreType.DEFAULT_ROCKSDB.getStoreType());
+ new CombineConsumeQueueStore(messageStore);
+ }
+
@Test
public void CombineConsumeQueueStore_InitializesConsumeQueueStore() throws
Exception {
messageStore = (DefaultMessageStore) createMessageStore(null, false,
topicConfigTableMap, messageStoreConfig);
@@ -152,15 +172,15 @@ public class CombineConsumeQueueStoreTest extends
QueueTestBase {
messageStore.start();
//The initial min max offset, before and after the creation of consume
queue
- Assert.assertEquals(0, messageStore.getMaxOffsetInQueue(topic,
queueId));
- Assert.assertEquals(0, messageStore.getMinOffsetInQueue(topic,
queueId));
+ assertEquals(0, messageStore.getMaxOffsetInQueue(topic, queueId));
+ assertEquals(0, messageStore.getMinOffsetInQueue(topic, queueId));
ConsumeQueueInterface consumeQueue =
messageStore.getConsumeQueue(topic, queueId);
- Assert.assertEquals(CQType.SimpleCQ, consumeQueue.getCQType());
- Assert.assertEquals(0, consumeQueue.getMaxOffsetInQueue());
- Assert.assertEquals(0, consumeQueue.getMinOffsetInQueue());
- Assert.assertEquals(0, messageStore.getMaxOffsetInQueue(topic,
queueId));
- Assert.assertEquals(0, messageStore.getMinOffsetInQueue(topic,
queueId));
+ assertEquals(CQType.SimpleCQ, consumeQueue.getCQType());
+ assertEquals(0, consumeQueue.getMaxOffsetInQueue());
+ assertEquals(0, consumeQueue.getMinOffsetInQueue());
+ assertEquals(0, messageStore.getMaxOffsetInQueue(topic, queueId));
+ assertEquals(0, messageStore.getMinOffsetInQueue(topic, queueId));
for (int i = 0; i < msgNum; i++) {
DispatchRequest request = new DispatchRequest(topic, queueId, i *
msgSize, msgSize, i,
@@ -173,8 +193,8 @@ public class CombineConsumeQueueStoreTest extends
QueueTestBase {
CombineConsumeQueueStore combineConsumeQueueStore =
(CombineConsumeQueueStore) messageStore.getQueueStore();
ConsumeQueueInterface rocksDBConsumeQueue =
combineConsumeQueueStore.getRocksDBConsumeQueueStore().getConsumeQueue(topic,
queueId);
- Assert.assertEquals(CQType.RocksDBCQ,
rocksDBConsumeQueue.getCQType());
- Assert.assertEquals(msgNum,
rocksDBConsumeQueue.getMaxOffsetInQueue());
+ assertEquals(CQType.RocksDBCQ, rocksDBConsumeQueue.getCQType());
+ assertEquals(msgNum, rocksDBConsumeQueue.getMaxOffsetInQueue());
checkCQ(rocksDBConsumeQueue, msgNum, msgSize);
});
}
@@ -190,15 +210,15 @@ public class CombineConsumeQueueStoreTest extends
QueueTestBase {
messageStore.start();
String lmqName = MixAll.LMQ_PREFIX + UUID.randomUUID();
- Assert.assertEquals(0, messageStore.getMaxOffsetInQueue(lmqName,
queueId));
- Assert.assertEquals(0, messageStore.getMinOffsetInQueue(lmqName,
queueId));
+ assertEquals(0, messageStore.getMaxOffsetInQueue(lmqName, queueId));
+ assertEquals(0, messageStore.getMinOffsetInQueue(lmqName, queueId));
ConsumeQueueInterface consumeQueue =
messageStore.getConsumeQueue(lmqName, queueId);
- Assert.assertEquals(CQType.RocksDBCQ, consumeQueue.getCQType());
- Assert.assertEquals(0, consumeQueue.getMaxOffsetInQueue());
- Assert.assertEquals(0, consumeQueue.getMinOffsetInQueue());
- Assert.assertEquals(0, messageStore.getMaxOffsetInQueue(lmqName,
queueId));
- Assert.assertEquals(0, messageStore.getMinOffsetInQueue(lmqName,
queueId));
+ assertEquals(CQType.RocksDBCQ, consumeQueue.getCQType());
+ assertEquals(0, consumeQueue.getMaxOffsetInQueue());
+ assertEquals(0, consumeQueue.getMinOffsetInQueue());
+ assertEquals(0, messageStore.getMaxOffsetInQueue(lmqName, queueId));
+ assertEquals(0, messageStore.getMinOffsetInQueue(lmqName, queueId));
for (int i = 0; i < msgNum; i++) {
Map<String, String> propertyMap = new HashMap<>();
@@ -216,18 +236,18 @@ public class CombineConsumeQueueStoreTest extends
QueueTestBase {
ConsumeQueueInterface rocksDBConsumeQueue =
combineConsumeQueueStore.getRocksDBConsumeQueueStore().getConsumeQueue(lmqName,
queueId);
ConsumeQueueInterface fileConsumeQueue =
combineConsumeQueueStore.getConsumeQueueStore().getConsumeQueue(lmqName,
queueId);
- Assert.assertEquals(consumeQueue, rocksDBConsumeQueue);
- Assert.assertNull(fileConsumeQueue); // not exist in file CQ store
- Assert.assertEquals(msgNum,
rocksDBConsumeQueue.getMaxOffsetInQueue());
+ assertEquals(consumeQueue, rocksDBConsumeQueue);
+ assertNull(fileConsumeQueue); // not exist in file CQ store
+ assertEquals(msgNum, rocksDBConsumeQueue.getMaxOffsetInQueue());
});
}
private void checkCQ(ConsumeQueueInterface consumeQueue, int msgNum,
int msgSize) {
- Assert.assertEquals(0, consumeQueue.getMinLogicOffset());
- Assert.assertEquals(0, consumeQueue.getMinOffsetInQueue());
- Assert.assertEquals(msgNum, consumeQueue.getMaxOffsetInQueue());
- Assert.assertEquals(msgNum, consumeQueue.getMessageTotalInQueue());
+ assertEquals(0, consumeQueue.getMinLogicOffset());
+ assertEquals(0, consumeQueue.getMinOffsetInQueue());
+ assertEquals(msgNum, consumeQueue.getMaxOffsetInQueue());
+ assertEquals(msgNum, consumeQueue.getMessageTotalInQueue());
assertNull(consumeQueue.iterateFrom(-1));
assertNull(consumeQueue.iterateFrom(msgNum));
@@ -235,15 +255,15 @@ public class CombineConsumeQueueStoreTest extends
QueueTestBase {
{
CqUnit first = consumeQueue.getEarliestUnit();
assertNotNull(first);
- Assert.assertEquals(0, first.getQueueOffset());
- Assert.assertEquals(msgSize, first.getSize());
+ assertEquals(0, first.getQueueOffset());
+ assertEquals(msgSize, first.getSize());
assertTrue(first.isTagsCodeValid());
}
{
CqUnit last = consumeQueue.getLatestUnit();
assertNotNull(last);
- Assert.assertEquals(msgNum - 1, last.getQueueOffset());
- Assert.assertEquals(msgSize, last.getSize());
+ assertEquals(msgNum - 1, last.getQueueOffset());
+ assertEquals(msgSize, last.getSize());
assertTrue(last.isTagsCodeValid());
}
@@ -253,17 +273,17 @@ public class CombineConsumeQueueStoreTest extends
QueueTestBase {
long queueOffset = i;
while (iterator.hasNext()) {
CqUnit cqUnit = iterator.next();
- Assert.assertEquals(queueOffset, cqUnit.getQueueOffset());
- Assert.assertEquals(queueOffset * msgSize, cqUnit.getPos());
- Assert.assertEquals(msgSize, cqUnit.getSize());
+ assertEquals(queueOffset, cqUnit.getQueueOffset());
+ assertEquals(queueOffset * msgSize, cqUnit.getPos());
+ assertEquals(msgSize, cqUnit.getSize());
assertTrue(cqUnit.isTagsCodeValid());
- Assert.assertEquals(queueOffset, cqUnit.getTagsCode());
- Assert.assertEquals(queueOffset,
cqUnit.getValidTagsCodeAsLong().longValue());
- Assert.assertEquals(1, cqUnit.getBatchNum());
+ assertEquals(queueOffset, cqUnit.getTagsCode());
+ assertEquals(queueOffset,
cqUnit.getValidTagsCodeAsLong().longValue());
+ assertEquals(1, cqUnit.getBatchNum());
assertNull(cqUnit.getCqExtUnit());
queueOffset++;
}
- Assert.assertEquals(msgNum, queueOffset);
+ assertEquals(msgNum, queueOffset);
}
}
@@ -290,18 +310,18 @@ public class CombineConsumeQueueStoreTest extends
QueueTestBase {
consumeQueueStore.findOrCreateConsumeQueue(topic, queueId);
rocksDBConsumeQueue.initializeWithOffset(100, 0);
- Assert.assertEquals(100,
rocksDBConsumeQueue.getMaxOffsetInQueue());
- Assert.assertEquals(100,
rocksDBConsumeQueue.getMinOffsetInQueue());
+ assertEquals(100, rocksDBConsumeQueue.getMaxOffsetInQueue());
+ assertEquals(100, rocksDBConsumeQueue.getMinOffsetInQueue());
rocksDBConsumeQueue.initializeWithOffset(200, 0);
- Assert.assertEquals(200,
rocksDBConsumeQueue.getMaxOffsetInQueue());
- Assert.assertEquals(200,
rocksDBConsumeQueue.getMinOffsetInQueue());
+ assertEquals(200, rocksDBConsumeQueue.getMaxOffsetInQueue());
+ assertEquals(200, rocksDBConsumeQueue.getMinOffsetInQueue());
messageStore.start();
- Assert.assertEquals(0, rocksDBConsumeQueue.getMaxOffsetInQueue());
- Assert.assertEquals(0, rocksDBConsumeQueue.getMinOffsetInQueue());
+ assertEquals(0, rocksDBConsumeQueue.getMaxOffsetInQueue());
+ assertEquals(0, rocksDBConsumeQueue.getMinOffsetInQueue());
ConsumeQueue consumeQueue = (ConsumeQueue)
consumeQueueStore.findOrCreateConsumeQueue(topic, queueId);
@@ -324,8 +344,8 @@ public class CombineConsumeQueueStoreTest extends
QueueTestBase {
}
await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> {
- Assert.assertEquals(msgNum,
consumeQueue.getMaxOffsetInQueue());
- Assert.assertEquals(0, consumeQueue.getMinOffsetInQueue());
+ assertEquals(msgNum, consumeQueue.getMaxOffsetInQueue());
+ assertEquals(0, consumeQueue.getMinOffsetInQueue());
});
messageStore.shutdown();
@@ -347,13 +367,13 @@ public class CombineConsumeQueueStoreTest extends
QueueTestBase {
consumeQueueStore.findOrCreateConsumeQueue(topic,
queueId).initializeWithOffset(200, 0);
ConsumeQueueInterface cq =
rocksDBConsumeQueueStore.findOrCreateConsumeQueue(topic, queueId);
- Assert.assertEquals(msgNum, cq.getMaxOffsetInQueue());
- Assert.assertEquals(0, cq.getMinOffsetInQueue());
+ assertEquals(msgNum, cq.getMaxOffsetInQueue());
+ assertEquals(0, cq.getMinOffsetInQueue());
combineConsumeQueueStore.verifyAndInitOffsetForAllStore(true);
- Assert.assertEquals(200, cq.getMaxOffsetInQueue());
- Assert.assertEquals(200, cq.getMinOffsetInQueue());
+ assertEquals(200, cq.getMaxOffsetInQueue());
+ assertEquals(200, cq.getMinOffsetInQueue());
messageStore.shutdown();
}
@@ -393,8 +413,8 @@ public class CombineConsumeQueueStoreTest extends
QueueTestBase {
await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> {
File cq = new File(path + File.separator + "consumequeue" +
File.separator + topic + File.separator + queueId + File.separator +
"00000000000000000000");
assertTrue(cq.exists());
- Assert.assertEquals(msgNum, (long)
messageStore.getQueueStore().getMaxOffset(topic, queueId));
- Assert.assertEquals(0,
messageStore.getQueueStore().getMinOffsetInQueue(topic, queueId));
+ assertEquals(msgNum, (long)
messageStore.getQueueStore().getMaxOffset(topic, queueId));
+ assertEquals(0,
messageStore.getQueueStore().getMinOffsetInQueue(topic, queueId));
});
messageStore.shutdown();
@@ -437,14 +457,14 @@ public class CombineConsumeQueueStoreTest extends
QueueTestBase {
RocksDBConsumeQueueStore rocksDBConsumeQueueStore =
combineConsumeQueueStore.getRocksDBConsumeQueueStore();
String lmqName = MixAll.LMQ_PREFIX + UUID.randomUUID();
- Assert.assertEquals(0,
combineConsumeQueueStore.getLmqQueueOffset(lmqName, queueId));
- Assert.assertEquals(0, consumeQueueStore.getLmqQueueOffset(lmqName,
queueId));
- Assert.assertEquals(0,
rocksDBConsumeQueueStore.getLmqQueueOffset(lmqName, queueId));
+ assertEquals(0, combineConsumeQueueStore.getLmqQueueOffset(lmqName,
queueId));
+ assertEquals(0, consumeQueueStore.getLmqQueueOffset(lmqName, queueId));
+ assertEquals(0, rocksDBConsumeQueueStore.getLmqQueueOffset(lmqName,
queueId));
combineConsumeQueueStore.increaseLmqOffset(lmqName, queueId, (short)
100);
- Assert.assertEquals(100,
combineConsumeQueueStore.getLmqQueueOffset(lmqName, queueId));
- Assert.assertEquals(0, consumeQueueStore.getLmqQueueOffset(lmqName,
queueId));
- Assert.assertEquals(100,
rocksDBConsumeQueueStore.getLmqQueueOffset(lmqName, queueId));
+ assertEquals(100, combineConsumeQueueStore.getLmqQueueOffset(lmqName,
queueId));
+ assertEquals(0, consumeQueueStore.getLmqQueueOffset(lmqName, queueId));
+ assertEquals(100, rocksDBConsumeQueueStore.getLmqQueueOffset(lmqName,
queueId));
}
@Test
@@ -461,12 +481,12 @@ public class CombineConsumeQueueStoreTest extends
QueueTestBase {
RocksDBConsumeQueueStore rocksDBConsumeQueueStore =
combineConsumeQueueStore.getRocksDBConsumeQueueStore();
String lmqName = MixAll.LMQ_PREFIX + UUID.randomUUID();
- Assert.assertEquals(0, combineConsumeQueueStore.getLmqNum());
- Assert.assertEquals(0, rocksDBConsumeQueueStore.getLmqNum());
- Assert.assertEquals(0, consumeQueueStore.getLmqNum());
- Assert.assertFalse(combineConsumeQueueStore.isLmqExist(lmqName));
- Assert.assertFalse(rocksDBConsumeQueueStore.isLmqExist(lmqName));
- Assert.assertFalse(consumeQueueStore.isLmqExist(lmqName));
+ assertEquals(0, combineConsumeQueueStore.getLmqNum());
+ assertEquals(0, rocksDBConsumeQueueStore.getLmqNum());
+ assertEquals(0, consumeQueueStore.getLmqNum());
+ assertFalse(combineConsumeQueueStore.isLmqExist(lmqName));
+ assertFalse(rocksDBConsumeQueueStore.isLmqExist(lmqName));
+ assertFalse(consumeQueueStore.isLmqExist(lmqName));
for (int i = 0; i < msgNum; i++) {
Map<String, String> propertyMap = new HashMap<>();
@@ -478,13 +498,70 @@ public class CombineConsumeQueueStoreTest extends
QueueTestBase {
}
await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
- Assert.assertEquals(1, combineConsumeQueueStore.getLmqNum());
- Assert.assertEquals(1, rocksDBConsumeQueueStore.getLmqNum());
- Assert.assertEquals(0, consumeQueueStore.getLmqNum());
+ assertEquals(1, combineConsumeQueueStore.getLmqNum());
+ assertEquals(1, rocksDBConsumeQueueStore.getLmqNum());
+ assertEquals(0, consumeQueueStore.getLmqNum());
+
+ assertTrue(combineConsumeQueueStore.isLmqExist(lmqName));
+ assertTrue(rocksDBConsumeQueueStore.isLmqExist(lmqName));
+ assertFalse(consumeQueueStore.isLmqExist(lmqName));
+ });
+ }
- Assert.assertTrue(combineConsumeQueueStore.isLmqExist(lmqName));
- Assert.assertTrue(rocksDBConsumeQueueStore.isLmqExist(lmqName));
- Assert.assertFalse(consumeQueueStore.isLmqExist(lmqName));
+ @Test
+ public void testSelectiveDoubleWrite() throws Exception {
+ messageStoreConfig.setRocksdbCQDoubleWriteEnable(true);
+ messageStoreConfig.setRocksdbCQSelectiveDoubleWriteEnable(true);
+ messageStore = (DefaultMessageStore) createMessageStore(null, false,
topicConfigTableMap, messageStoreConfig);
+ messageStore.load();
+ messageStore.start();
+ CombineConsumeQueueStore combineConsumeQueueStore =
(CombineConsumeQueueStore) messageStore.getQueueStore();
+ ConsumeQueueStore consumeQueueStore =
combineConsumeQueueStore.getConsumeQueueStore();
+ RocksDBConsumeQueueStore rocksDBConsumeQueueStore =
combineConsumeQueueStore.getRocksDBConsumeQueueStore();
+
+ String liteTopic = "LiteTopic";
+ String normalTopic = "NormalTopic";
+ String retryTopic = MixAll.RETRY_GROUP_TOPIC_PREFIX + "CID_GROUP" +
"+" + normalTopic;
+
+ topicConfigTableMap.put(liteTopic, new TopicConfig(liteTopic, 1, 1,
PermName.PERM_WRITE | PermName.PERM_READ));
+
topicConfigTableMap.get(liteTopic).setTopicMessageType(TopicMessageType.LITE);
+ topicConfigTableMap.put(normalTopic, new TopicConfig(normalTopic, 1,
1, PermName.PERM_WRITE | PermName.PERM_READ));
+ topicConfigTableMap.put(retryTopic, new TopicConfig(retryTopic, 1, 1,
PermName.PERM_WRITE | PermName.PERM_READ));
+
+ ConsumeQueueInterface liteTopicCQ =
combineConsumeQueueStore.getConsumeQueue(liteTopic, queueId);
+ ConsumeQueueInterface normalTopicCQ =
combineConsumeQueueStore.getConsumeQueue(normalTopic, queueId);
+ ConsumeQueueInterface retryTopicCQ =
combineConsumeQueueStore.getConsumeQueue(retryTopic, queueId);
+ assertNull(liteTopicCQ);
+ assertNull(normalTopicCQ);
+ assertNull(retryTopicCQ);
+
+ DispatchRequest request1 = new DispatchRequest(liteTopic, queueId,
msgSize, msgSize, 0,
+ System.currentTimeMillis(), 0, null, null, 0, 0, null);
+ messageStore.getQueueStore().putMessagePositionInfoWrapper(request1);
+
+ DispatchRequest request2 = new DispatchRequest(normalTopic, queueId, 2
* msgSize, msgSize, 0,
+ System.currentTimeMillis(), 0, null, null, 0, 0, null);
+ messageStore.getQueueStore().putMessagePositionInfoWrapper(request2);
+
+ DispatchRequest request3 = new DispatchRequest(retryTopic, queueId, 3
* msgSize, msgSize, 0,
+ System.currentTimeMillis(), 0, null, null, 0, 0, null);
+ messageStore.getQueueStore().putMessagePositionInfoWrapper(request3);
+
+ await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
+ ConsumeQueueInterface liteTopicFileCQ =
consumeQueueStore.getConsumeQueue(liteTopic, queueId);
+ ConsumeQueueInterface liteTopicRocksdbCQ =
rocksDBConsumeQueueStore.getConsumeQueue(liteTopic, queueId);
+ assertEquals(1L, liteTopicFileCQ.getMaxOffsetInQueue());
+ assertEquals(1L, liteTopicRocksdbCQ.getMaxOffsetInQueue());
+
+ ConsumeQueueInterface normalTopicFileCQ =
consumeQueueStore.getConsumeQueue(normalTopic, queueId);
+ ConsumeQueueInterface normalTopicRocksdbCQ =
rocksDBConsumeQueueStore.getConsumeQueue(normalTopic, queueId);
+ assertEquals(1L, normalTopicFileCQ.getMaxOffsetInQueue());
+ assertEquals(0L, normalTopicRocksdbCQ.getMaxOffsetInQueue());
+
+ ConsumeQueueInterface retryTopicFileCQ =
consumeQueueStore.getConsumeQueue(retryTopic, queueId);
+ ConsumeQueueInterface retryTopicRocksdbCQ =
rocksDBConsumeQueueStore.getConsumeQueue(retryTopic, queueId);
+ assertEquals(1L, retryTopicFileCQ.getMaxOffsetInQueue());
+ assertEquals(0L, retryTopicRocksdbCQ.getMaxOffsetInQueue());
});
}
}