This is an automated email from the ASF dual-hosted git repository.

lizhimins pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 1ab9689f34 [ISSUE #10450] Selective double-write in 
CombineConsumeQueueStore (#10452)
1ab9689f34 is described below

commit 1ab9689f34db0b14c8f4c548cc5be5b52c83fc29
Author: imzs <[email protected]>
AuthorDate: Thu Jun 11 21:14:18 2026 +0800

    [ISSUE #10450] Selective double-write in CombineConsumeQueueStore (#10452)
---
 .../broker/processor/LiteManagerProcessor.java     |   3 +-
 .../rocketmq/store/config/MessageStoreConfig.java  |  10 +
 .../store/queue/CombineConsumeQueueStore.java      |  38 ++++
 .../store/queue/CombineConsumeQueueStoreTest.java  | 211 ++++++++++++++-------
 4 files changed, 194 insertions(+), 68 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/LiteManagerProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/LiteManagerProcessor.java
index 7d24d7f95d..63037b9fb7 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/LiteManagerProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/LiteManagerProcessor.java
@@ -109,7 +109,8 @@ public class LiteManagerProcessor implements 
NettyRequestProcessor {
         final RemotingCommand response = 
RemotingCommand.createResponseCommand(null);
 
         GetBrokerLiteInfoResponseBody body = new 
GetBrokerLiteInfoResponseBody();
-        
body.setStoreType(brokerController.getMessageStoreConfig().getStoreType());
+        
body.setStoreType(brokerController.getMessageStoreConfig().isRocksdbCQDoubleWriteEnable()
 ?
+            "Combine" : 
brokerController.getMessageStoreConfig().getStoreType());
         
body.setMaxLmqNum(brokerController.getMessageStoreConfig().getMaxLmqConsumeQueueNum());
         
body.setCurrentLmqNum(brokerController.getMessageStore().getQueueStore().getLmqNum());
         
body.setLiteSubscriptionCount(brokerController.getLiteSubscriptionRegistry().getActiveSubscriptionNum());
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java 
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
index fadc957e9d..8cb3b1c908 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
@@ -473,6 +473,9 @@ public class MessageStoreConfig {
 
     private boolean rocksdbCQDoubleWriteEnable = false;
 
+    // Secondary switch of rocksdbCQDoubleWriteEnable. In 
CombineConsumeQueueStore, only specific topics will double-write CQ.
+    private boolean rocksdbCQSelectiveDoubleWriteEnable = false;
+
     /**
      * CombineConsumeQueueStore
      * combineCQLoadingCQTypes is used to configure the loading types of CQ. 
load / recover / start order: [default -> defaultRocksDB]
@@ -584,6 +587,13 @@ public class MessageStoreConfig {
         this.rocksdbCQDoubleWriteEnable = rocksdbWriteEnable;
     }
 
+    public boolean isRocksdbCQSelectiveDoubleWriteEnable() {
+        return rocksdbCQSelectiveDoubleWriteEnable;
+    }
+
+    public void setRocksdbCQSelectiveDoubleWriteEnable(boolean 
rocksdbCQSelectiveDoubleWriteEnable) {
+        this.rocksdbCQSelectiveDoubleWriteEnable = 
rocksdbCQSelectiveDoubleWriteEnable;
+    }
 
     public boolean isEnabledAppendPropCRC() {
         return enabledAppendPropCRC;
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/queue/CombineConsumeQueueStore.java
 
b/store/src/main/java/org/apache/rocketmq/store/queue/CombineConsumeQueueStore.java
index f266f9d57a..b074e7a09c 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/queue/CombineConsumeQueueStore.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/queue/CombineConsumeQueueStore.java
@@ -22,6 +22,7 @@ import com.google.common.annotations.VisibleForTesting;
 import java.util.Arrays;
 import java.util.LinkedList;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -29,6 +30,8 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.common.BoundaryType;
 import org.apache.rocketmq.common.CheckRocksdbCqWriteResult;
 import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.attribute.TopicMessageType;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 import org.apache.rocketmq.common.Pair;
@@ -125,6 +128,14 @@ public class CombineConsumeQueueStore implements 
ConsumeQueueStoreInterface {
             throw new IllegalArgumentException("CombineConsumeQueueStore maybe 
incorrect config");
         }
 
+        if (messageStoreConfig.isRocksdbCQSelectiveDoubleWriteEnable() && 
assignOffsetStore != consumeQueueStore) {
+            throw new IllegalArgumentException("CombineConsumeQueueStore maybe 
incorrect config");
+        }
+
+        if (messageStoreConfig.isRocksdbCQSelectiveDoubleWriteEnable() && 
currentReadStore != consumeQueueStore) {
+            throw new IllegalArgumentException("CombineConsumeQueueStore maybe 
incorrect config");
+        }
+
         log.info("CombineConsumeQueueStore init, consumeQueueStoreList={}, 
currentReadStore={}, assignOffsetStore={}, combineCQUseRocksdbForLmq={}",
             innerConsumeQueueStoreList, 
currentReadStore.getClass().getSimpleName(),
             assignOffsetStore.getClass().getSimpleName(), 
messageStoreConfig.isCombineCQUseRocksdbForLmq());
@@ -248,6 +259,10 @@ public class CombineConsumeQueueStore implements 
ConsumeQueueStoreInterface {
                         continue;
                     }
 
+                    if (abstractConsumeQueueStore == rocksDBConsumeQueueStore 
&& !shouldDoubleWriteForTopic(topic)) {
+                        continue;
+                    }
+
                     ConsumeQueueInterface queue = 
abstractConsumeQueueStore.findOrCreateConsumeQueue(topic, queueId);
                     long maxOffset0 = queue.getMaxOffsetInQueue();
 
@@ -347,6 +362,10 @@ public class CombineConsumeQueueStore implements 
ConsumeQueueStoreInterface {
     @Override
     public void putMessagePositionInfoWrapper(DispatchRequest request) throws 
RocksDBException {
         for (AbstractConsumeQueueStore store : innerConsumeQueueStoreList) {
+            // Not all topics are double-written
+            if (store == rocksDBConsumeQueueStore && 
!shouldDoubleWriteForTopic(request.getTopic())) {
+                continue;
+            }
             store.putMessagePositionInfoWrapper(request);
         }
     }
@@ -500,6 +519,11 @@ public class CombineConsumeQueueStore implements 
ConsumeQueueStoreInterface {
     private boolean processConsumeQueuesForTopic(ConcurrentMap<Integer, 
ConsumeQueueInterface> queueMap, String topic,
         AbstractConsumeQueueStore abstractConsumeQueueStore, StringBuilder 
diffResult, boolean printDetail,
         long checkpointByStoreTime) {
+
+        if (abstractConsumeQueueStore == rocksDBConsumeQueueStore && 
!shouldDoubleWriteForTopic(topic)) {
+            return true;
+        }
+
         boolean processResult = true;
         for (Map.Entry<Integer, ConsumeQueueInterface> queueEntry : 
queueMap.entrySet()) {
             Integer queueId = queueEntry.getKey();
@@ -601,4 +625,18 @@ public class CombineConsumeQueueStore implements 
ConsumeQueueStoreInterface {
         }
         return currentReadStore;
     }
+
+    /**
+     * Determines whether RocksDB CQ should be written for the given topic 
under selective double-write mode.
+     * The file-based ConsumeQueueStore is treated as the base store.
+     * @param topic the topic name
+     * @return true if RocksDB CQ should be written for this topic
+     */
+    protected boolean shouldDoubleWriteForTopic(String topic) {
+        if (!messageStoreConfig.isRocksdbCQSelectiveDoubleWriteEnable()) {
+            return true;
+        }
+        Optional<TopicConfig> tc = messageStore.getTopicConfig(topic);
+        return tc.isPresent() && tc.get().getTopicMessageType() == 
TopicMessageType.LITE;
+    }
 }
diff --git 
a/store/src/test/java/org/apache/rocketmq/store/queue/CombineConsumeQueueStoreTest.java
 
b/store/src/test/java/org/apache/rocketmq/store/queue/CombineConsumeQueueStoreTest.java
index e7ac763a18..03186e2729 100644
--- 
a/store/src/test/java/org/apache/rocketmq/store/queue/CombineConsumeQueueStoreTest.java
+++ 
b/store/src/test/java/org/apache/rocketmq/store/queue/CombineConsumeQueueStoreTest.java
@@ -30,6 +30,7 @@ import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.attribute.CQType;
+import org.apache.rocketmq.common.attribute.TopicMessageType;
 import org.apache.rocketmq.common.constant.PermName;
 import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.common.message.MessageExtBrokerInner;
@@ -39,7 +40,6 @@ import org.apache.rocketmq.store.DispatchRequest;
 import org.apache.rocketmq.store.StoreType;
 import org.apache.rocketmq.store.config.MessageStoreConfig;
 import org.junit.After;
-import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -47,9 +47,11 @@ import org.mockito.junit.MockitoJUnitRunner;
 
 import static org.apache.rocketmq.common.TopicFilterType.SINGLE_TAG;
 import static org.awaitility.Awaitility.await;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
 
 @RunWith(MockitoJUnitRunner.class)
 public class CombineConsumeQueueStoreTest extends QueueTestBase {
@@ -105,6 +107,24 @@ public class CombineConsumeQueueStoreTest extends 
QueueTestBase {
         new CombineConsumeQueueStore(messageStore);
     }
 
+    @Test(expected = IllegalArgumentException.class)
+    public void 
CombineConsumeQueueStore_selectiveDoubleWrite_assignOffsetStoreIsRocksdb_ThrowsException()
 throws Exception {
+        messageStore = (DefaultMessageStore) createMessageStore(null, false, 
topicConfigTableMap, messageStoreConfig);
+
+        messageStoreConfig.setRocksdbCQSelectiveDoubleWriteEnable(true);
+        
messageStoreConfig.setCombineAssignOffsetCQType(StoreType.DEFAULT_ROCKSDB.getStoreType());
+        new CombineConsumeQueueStore(messageStore);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void 
CombineConsumeQueueStore_selectiveDoubleWrite_readStoreIsRocksdb_ThrowsException()
 throws Exception {
+        messageStore = (DefaultMessageStore) createMessageStore(null, false, 
topicConfigTableMap, messageStoreConfig);
+
+        messageStoreConfig.setRocksdbCQSelectiveDoubleWriteEnable(true);
+        
messageStoreConfig.setCombineCQPreferCQType(StoreType.DEFAULT_ROCKSDB.getStoreType());
+        new CombineConsumeQueueStore(messageStore);
+    }
+
     @Test
     public void CombineConsumeQueueStore_InitializesConsumeQueueStore() throws 
Exception {
         messageStore = (DefaultMessageStore) createMessageStore(null, false, 
topicConfigTableMap, messageStoreConfig);
@@ -152,15 +172,15 @@ public class CombineConsumeQueueStoreTest extends 
QueueTestBase {
         messageStore.start();
 
         //The initial min max offset, before and after the creation of consume 
queue
-        Assert.assertEquals(0, messageStore.getMaxOffsetInQueue(topic, 
queueId));
-        Assert.assertEquals(0, messageStore.getMinOffsetInQueue(topic, 
queueId));
+        assertEquals(0, messageStore.getMaxOffsetInQueue(topic, queueId));
+        assertEquals(0, messageStore.getMinOffsetInQueue(topic, queueId));
 
         ConsumeQueueInterface consumeQueue = 
messageStore.getConsumeQueue(topic, queueId);
-        Assert.assertEquals(CQType.SimpleCQ, consumeQueue.getCQType());
-        Assert.assertEquals(0, consumeQueue.getMaxOffsetInQueue());
-        Assert.assertEquals(0, consumeQueue.getMinOffsetInQueue());
-        Assert.assertEquals(0, messageStore.getMaxOffsetInQueue(topic, 
queueId));
-        Assert.assertEquals(0, messageStore.getMinOffsetInQueue(topic, 
queueId));
+        assertEquals(CQType.SimpleCQ, consumeQueue.getCQType());
+        assertEquals(0, consumeQueue.getMaxOffsetInQueue());
+        assertEquals(0, consumeQueue.getMinOffsetInQueue());
+        assertEquals(0, messageStore.getMaxOffsetInQueue(topic, queueId));
+        assertEquals(0, messageStore.getMinOffsetInQueue(topic, queueId));
 
         for (int i = 0; i < msgNum; i++) {
             DispatchRequest request = new DispatchRequest(topic, queueId, i * 
msgSize, msgSize, i,
@@ -173,8 +193,8 @@ public class CombineConsumeQueueStoreTest extends 
QueueTestBase {
 
             CombineConsumeQueueStore combineConsumeQueueStore = 
(CombineConsumeQueueStore) messageStore.getQueueStore();
             ConsumeQueueInterface rocksDBConsumeQueue = 
combineConsumeQueueStore.getRocksDBConsumeQueueStore().getConsumeQueue(topic, 
queueId);
-            Assert.assertEquals(CQType.RocksDBCQ, 
rocksDBConsumeQueue.getCQType());
-            Assert.assertEquals(msgNum, 
rocksDBConsumeQueue.getMaxOffsetInQueue());
+            assertEquals(CQType.RocksDBCQ, rocksDBConsumeQueue.getCQType());
+            assertEquals(msgNum, rocksDBConsumeQueue.getMaxOffsetInQueue());
             checkCQ(rocksDBConsumeQueue, msgNum, msgSize);
         });
     }
@@ -190,15 +210,15 @@ public class CombineConsumeQueueStoreTest extends 
QueueTestBase {
         messageStore.start();
 
         String lmqName = MixAll.LMQ_PREFIX + UUID.randomUUID();
-        Assert.assertEquals(0, messageStore.getMaxOffsetInQueue(lmqName, 
queueId));
-        Assert.assertEquals(0, messageStore.getMinOffsetInQueue(lmqName, 
queueId));
+        assertEquals(0, messageStore.getMaxOffsetInQueue(lmqName, queueId));
+        assertEquals(0, messageStore.getMinOffsetInQueue(lmqName, queueId));
 
         ConsumeQueueInterface consumeQueue = 
messageStore.getConsumeQueue(lmqName, queueId);
-        Assert.assertEquals(CQType.RocksDBCQ, consumeQueue.getCQType());
-        Assert.assertEquals(0, consumeQueue.getMaxOffsetInQueue());
-        Assert.assertEquals(0, consumeQueue.getMinOffsetInQueue());
-        Assert.assertEquals(0, messageStore.getMaxOffsetInQueue(lmqName, 
queueId));
-        Assert.assertEquals(0, messageStore.getMinOffsetInQueue(lmqName, 
queueId));
+        assertEquals(CQType.RocksDBCQ, consumeQueue.getCQType());
+        assertEquals(0, consumeQueue.getMaxOffsetInQueue());
+        assertEquals(0, consumeQueue.getMinOffsetInQueue());
+        assertEquals(0, messageStore.getMaxOffsetInQueue(lmqName, queueId));
+        assertEquals(0, messageStore.getMinOffsetInQueue(lmqName, queueId));
 
         for (int i = 0; i < msgNum; i++) {
             Map<String, String> propertyMap = new HashMap<>();
@@ -216,18 +236,18 @@ public class CombineConsumeQueueStoreTest extends 
QueueTestBase {
             ConsumeQueueInterface rocksDBConsumeQueue = 
combineConsumeQueueStore.getRocksDBConsumeQueueStore().getConsumeQueue(lmqName, 
queueId);
             ConsumeQueueInterface fileConsumeQueue = 
combineConsumeQueueStore.getConsumeQueueStore().getConsumeQueue(lmqName, 
queueId);
 
-            Assert.assertEquals(consumeQueue, rocksDBConsumeQueue);
-            Assert.assertNull(fileConsumeQueue); // not exist in file CQ store
-            Assert.assertEquals(msgNum, 
rocksDBConsumeQueue.getMaxOffsetInQueue());
+            assertEquals(consumeQueue, rocksDBConsumeQueue);
+            assertNull(fileConsumeQueue); // not exist in file CQ store
+            assertEquals(msgNum, rocksDBConsumeQueue.getMaxOffsetInQueue());
         });
     }
 
     private void checkCQ(ConsumeQueueInterface consumeQueue, int msgNum,
         int msgSize) {
-        Assert.assertEquals(0, consumeQueue.getMinLogicOffset());
-        Assert.assertEquals(0, consumeQueue.getMinOffsetInQueue());
-        Assert.assertEquals(msgNum, consumeQueue.getMaxOffsetInQueue());
-        Assert.assertEquals(msgNum, consumeQueue.getMessageTotalInQueue());
+        assertEquals(0, consumeQueue.getMinLogicOffset());
+        assertEquals(0, consumeQueue.getMinOffsetInQueue());
+        assertEquals(msgNum, consumeQueue.getMaxOffsetInQueue());
+        assertEquals(msgNum, consumeQueue.getMessageTotalInQueue());
 
         assertNull(consumeQueue.iterateFrom(-1));
         assertNull(consumeQueue.iterateFrom(msgNum));
@@ -235,15 +255,15 @@ public class CombineConsumeQueueStoreTest extends 
QueueTestBase {
         {
             CqUnit first = consumeQueue.getEarliestUnit();
             assertNotNull(first);
-            Assert.assertEquals(0, first.getQueueOffset());
-            Assert.assertEquals(msgSize, first.getSize());
+            assertEquals(0, first.getQueueOffset());
+            assertEquals(msgSize, first.getSize());
             assertTrue(first.isTagsCodeValid());
         }
         {
             CqUnit last = consumeQueue.getLatestUnit();
             assertNotNull(last);
-            Assert.assertEquals(msgNum - 1, last.getQueueOffset());
-            Assert.assertEquals(msgSize, last.getSize());
+            assertEquals(msgNum - 1, last.getQueueOffset());
+            assertEquals(msgSize, last.getSize());
             assertTrue(last.isTagsCodeValid());
         }
 
@@ -253,17 +273,17 @@ public class CombineConsumeQueueStoreTest extends 
QueueTestBase {
             long queueOffset = i;
             while (iterator.hasNext()) {
                 CqUnit cqUnit = iterator.next();
-                Assert.assertEquals(queueOffset, cqUnit.getQueueOffset());
-                Assert.assertEquals(queueOffset * msgSize, cqUnit.getPos());
-                Assert.assertEquals(msgSize, cqUnit.getSize());
+                assertEquals(queueOffset, cqUnit.getQueueOffset());
+                assertEquals(queueOffset * msgSize, cqUnit.getPos());
+                assertEquals(msgSize, cqUnit.getSize());
                 assertTrue(cqUnit.isTagsCodeValid());
-                Assert.assertEquals(queueOffset, cqUnit.getTagsCode());
-                Assert.assertEquals(queueOffset, 
cqUnit.getValidTagsCodeAsLong().longValue());
-                Assert.assertEquals(1, cqUnit.getBatchNum());
+                assertEquals(queueOffset, cqUnit.getTagsCode());
+                assertEquals(queueOffset, 
cqUnit.getValidTagsCodeAsLong().longValue());
+                assertEquals(1, cqUnit.getBatchNum());
                 assertNull(cqUnit.getCqExtUnit());
                 queueOffset++;
             }
-            Assert.assertEquals(msgNum, queueOffset);
+            assertEquals(msgNum, queueOffset);
         }
     }
 
@@ -290,18 +310,18 @@ public class CombineConsumeQueueStoreTest extends 
QueueTestBase {
             consumeQueueStore.findOrCreateConsumeQueue(topic, queueId);
             rocksDBConsumeQueue.initializeWithOffset(100, 0);
 
-            Assert.assertEquals(100, 
rocksDBConsumeQueue.getMaxOffsetInQueue());
-            Assert.assertEquals(100, 
rocksDBConsumeQueue.getMinOffsetInQueue());
+            assertEquals(100, rocksDBConsumeQueue.getMaxOffsetInQueue());
+            assertEquals(100, rocksDBConsumeQueue.getMinOffsetInQueue());
 
             rocksDBConsumeQueue.initializeWithOffset(200, 0);
 
-            Assert.assertEquals(200, 
rocksDBConsumeQueue.getMaxOffsetInQueue());
-            Assert.assertEquals(200, 
rocksDBConsumeQueue.getMinOffsetInQueue());
+            assertEquals(200, rocksDBConsumeQueue.getMaxOffsetInQueue());
+            assertEquals(200, rocksDBConsumeQueue.getMinOffsetInQueue());
 
             messageStore.start();
 
-            Assert.assertEquals(0, rocksDBConsumeQueue.getMaxOffsetInQueue());
-            Assert.assertEquals(0, rocksDBConsumeQueue.getMinOffsetInQueue());
+            assertEquals(0, rocksDBConsumeQueue.getMaxOffsetInQueue());
+            assertEquals(0, rocksDBConsumeQueue.getMinOffsetInQueue());
 
             ConsumeQueue consumeQueue = (ConsumeQueue) 
consumeQueueStore.findOrCreateConsumeQueue(topic, queueId);
 
@@ -324,8 +344,8 @@ public class CombineConsumeQueueStoreTest extends 
QueueTestBase {
             }
 
             await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> {
-                Assert.assertEquals(msgNum, 
consumeQueue.getMaxOffsetInQueue());
-                Assert.assertEquals(0, consumeQueue.getMinOffsetInQueue());
+                assertEquals(msgNum, consumeQueue.getMaxOffsetInQueue());
+                assertEquals(0, consumeQueue.getMinOffsetInQueue());
             });
 
             messageStore.shutdown();
@@ -347,13 +367,13 @@ public class CombineConsumeQueueStoreTest extends 
QueueTestBase {
             consumeQueueStore.findOrCreateConsumeQueue(topic, 
queueId).initializeWithOffset(200, 0);
 
             ConsumeQueueInterface cq = 
rocksDBConsumeQueueStore.findOrCreateConsumeQueue(topic, queueId);
-            Assert.assertEquals(msgNum, cq.getMaxOffsetInQueue());
-            Assert.assertEquals(0, cq.getMinOffsetInQueue());
+            assertEquals(msgNum, cq.getMaxOffsetInQueue());
+            assertEquals(0, cq.getMinOffsetInQueue());
 
             combineConsumeQueueStore.verifyAndInitOffsetForAllStore(true);
 
-            Assert.assertEquals(200, cq.getMaxOffsetInQueue());
-            Assert.assertEquals(200, cq.getMinOffsetInQueue());
+            assertEquals(200, cq.getMaxOffsetInQueue());
+            assertEquals(200, cq.getMinOffsetInQueue());
 
             messageStore.shutdown();
         }
@@ -393,8 +413,8 @@ public class CombineConsumeQueueStoreTest extends 
QueueTestBase {
             await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> {
                 File cq = new File(path + File.separator + "consumequeue" + 
File.separator + topic + File.separator + queueId + File.separator + 
"00000000000000000000");
                 assertTrue(cq.exists());
-                Assert.assertEquals(msgNum, (long) 
messageStore.getQueueStore().getMaxOffset(topic, queueId));
-                Assert.assertEquals(0, 
messageStore.getQueueStore().getMinOffsetInQueue(topic, queueId));
+                assertEquals(msgNum, (long) 
messageStore.getQueueStore().getMaxOffset(topic, queueId));
+                assertEquals(0, 
messageStore.getQueueStore().getMinOffsetInQueue(topic, queueId));
             });
 
             messageStore.shutdown();
@@ -437,14 +457,14 @@ public class CombineConsumeQueueStoreTest extends 
QueueTestBase {
         RocksDBConsumeQueueStore rocksDBConsumeQueueStore = 
combineConsumeQueueStore.getRocksDBConsumeQueueStore();
 
         String lmqName = MixAll.LMQ_PREFIX + UUID.randomUUID();
-        Assert.assertEquals(0, 
combineConsumeQueueStore.getLmqQueueOffset(lmqName, queueId));
-        Assert.assertEquals(0, consumeQueueStore.getLmqQueueOffset(lmqName, 
queueId));
-        Assert.assertEquals(0, 
rocksDBConsumeQueueStore.getLmqQueueOffset(lmqName, queueId));
+        assertEquals(0, combineConsumeQueueStore.getLmqQueueOffset(lmqName, 
queueId));
+        assertEquals(0, consumeQueueStore.getLmqQueueOffset(lmqName, queueId));
+        assertEquals(0, rocksDBConsumeQueueStore.getLmqQueueOffset(lmqName, 
queueId));
 
         combineConsumeQueueStore.increaseLmqOffset(lmqName, queueId, (short) 
100);
-        Assert.assertEquals(100, 
combineConsumeQueueStore.getLmqQueueOffset(lmqName, queueId));
-        Assert.assertEquals(0, consumeQueueStore.getLmqQueueOffset(lmqName, 
queueId));
-        Assert.assertEquals(100, 
rocksDBConsumeQueueStore.getLmqQueueOffset(lmqName, queueId));
+        assertEquals(100, combineConsumeQueueStore.getLmqQueueOffset(lmqName, 
queueId));
+        assertEquals(0, consumeQueueStore.getLmqQueueOffset(lmqName, queueId));
+        assertEquals(100, rocksDBConsumeQueueStore.getLmqQueueOffset(lmqName, 
queueId));
     }
 
     @Test
@@ -461,12 +481,12 @@ public class CombineConsumeQueueStoreTest extends 
QueueTestBase {
         RocksDBConsumeQueueStore rocksDBConsumeQueueStore = 
combineConsumeQueueStore.getRocksDBConsumeQueueStore();
 
         String lmqName = MixAll.LMQ_PREFIX + UUID.randomUUID();
-        Assert.assertEquals(0, combineConsumeQueueStore.getLmqNum());
-        Assert.assertEquals(0, rocksDBConsumeQueueStore.getLmqNum());
-        Assert.assertEquals(0, consumeQueueStore.getLmqNum());
-        Assert.assertFalse(combineConsumeQueueStore.isLmqExist(lmqName));
-        Assert.assertFalse(rocksDBConsumeQueueStore.isLmqExist(lmqName));
-        Assert.assertFalse(consumeQueueStore.isLmqExist(lmqName));
+        assertEquals(0, combineConsumeQueueStore.getLmqNum());
+        assertEquals(0, rocksDBConsumeQueueStore.getLmqNum());
+        assertEquals(0, consumeQueueStore.getLmqNum());
+        assertFalse(combineConsumeQueueStore.isLmqExist(lmqName));
+        assertFalse(rocksDBConsumeQueueStore.isLmqExist(lmqName));
+        assertFalse(consumeQueueStore.isLmqExist(lmqName));
 
         for (int i = 0; i < msgNum; i++) {
             Map<String, String> propertyMap = new HashMap<>();
@@ -478,13 +498,70 @@ public class CombineConsumeQueueStoreTest extends 
QueueTestBase {
         }
 
         await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
-            Assert.assertEquals(1, combineConsumeQueueStore.getLmqNum());
-            Assert.assertEquals(1, rocksDBConsumeQueueStore.getLmqNum());
-            Assert.assertEquals(0, consumeQueueStore.getLmqNum());
+            assertEquals(1, combineConsumeQueueStore.getLmqNum());
+            assertEquals(1, rocksDBConsumeQueueStore.getLmqNum());
+            assertEquals(0, consumeQueueStore.getLmqNum());
+
+            assertTrue(combineConsumeQueueStore.isLmqExist(lmqName));
+            assertTrue(rocksDBConsumeQueueStore.isLmqExist(lmqName));
+            assertFalse(consumeQueueStore.isLmqExist(lmqName));
+        });
+    }
 
-            Assert.assertTrue(combineConsumeQueueStore.isLmqExist(lmqName));
-            Assert.assertTrue(rocksDBConsumeQueueStore.isLmqExist(lmqName));
-            Assert.assertFalse(consumeQueueStore.isLmqExist(lmqName));
+    @Test
+    public void testSelectiveDoubleWrite() throws Exception {
+        messageStoreConfig.setRocksdbCQDoubleWriteEnable(true);
+        messageStoreConfig.setRocksdbCQSelectiveDoubleWriteEnable(true);
+        messageStore = (DefaultMessageStore) createMessageStore(null, false, 
topicConfigTableMap, messageStoreConfig);
+        messageStore.load();
+        messageStore.start();
+        CombineConsumeQueueStore combineConsumeQueueStore = 
(CombineConsumeQueueStore) messageStore.getQueueStore();
+        ConsumeQueueStore consumeQueueStore = 
combineConsumeQueueStore.getConsumeQueueStore();
+        RocksDBConsumeQueueStore rocksDBConsumeQueueStore = 
combineConsumeQueueStore.getRocksDBConsumeQueueStore();
+
+        String liteTopic = "LiteTopic";
+        String normalTopic = "NormalTopic";
+        String retryTopic = MixAll.RETRY_GROUP_TOPIC_PREFIX + "CID_GROUP" + 
"+" + normalTopic;
+
+        topicConfigTableMap.put(liteTopic, new TopicConfig(liteTopic, 1, 1, 
PermName.PERM_WRITE | PermName.PERM_READ));
+        
topicConfigTableMap.get(liteTopic).setTopicMessageType(TopicMessageType.LITE);
+        topicConfigTableMap.put(normalTopic, new TopicConfig(normalTopic, 1, 
1, PermName.PERM_WRITE | PermName.PERM_READ));
+        topicConfigTableMap.put(retryTopic, new TopicConfig(retryTopic, 1, 1, 
PermName.PERM_WRITE | PermName.PERM_READ));
+
+        ConsumeQueueInterface liteTopicCQ = 
combineConsumeQueueStore.getConsumeQueue(liteTopic, queueId);
+        ConsumeQueueInterface normalTopicCQ = 
combineConsumeQueueStore.getConsumeQueue(normalTopic, queueId);
+        ConsumeQueueInterface retryTopicCQ = 
combineConsumeQueueStore.getConsumeQueue(retryTopic, queueId);
+        assertNull(liteTopicCQ);
+        assertNull(normalTopicCQ);
+        assertNull(retryTopicCQ);
+
+        DispatchRequest request1 = new DispatchRequest(liteTopic, queueId, 
msgSize, msgSize, 0,
+            System.currentTimeMillis(), 0, null, null, 0, 0, null);
+        messageStore.getQueueStore().putMessagePositionInfoWrapper(request1);
+
+        DispatchRequest request2 = new DispatchRequest(normalTopic, queueId, 2 
* msgSize, msgSize, 0,
+            System.currentTimeMillis(), 0, null, null, 0, 0, null);
+        messageStore.getQueueStore().putMessagePositionInfoWrapper(request2);
+
+        DispatchRequest request3 = new DispatchRequest(retryTopic, queueId, 3 
* msgSize, msgSize, 0,
+            System.currentTimeMillis(), 0, null, null, 0, 0, null);
+        messageStore.getQueueStore().putMessagePositionInfoWrapper(request3);
+
+        await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
+            ConsumeQueueInterface liteTopicFileCQ = 
consumeQueueStore.getConsumeQueue(liteTopic, queueId);
+            ConsumeQueueInterface liteTopicRocksdbCQ = 
rocksDBConsumeQueueStore.getConsumeQueue(liteTopic, queueId);
+            assertEquals(1L, liteTopicFileCQ.getMaxOffsetInQueue());
+            assertEquals(1L, liteTopicRocksdbCQ.getMaxOffsetInQueue());
+
+            ConsumeQueueInterface normalTopicFileCQ = 
consumeQueueStore.getConsumeQueue(normalTopic, queueId);
+            ConsumeQueueInterface normalTopicRocksdbCQ = 
rocksDBConsumeQueueStore.getConsumeQueue(normalTopic, queueId);
+            assertEquals(1L, normalTopicFileCQ.getMaxOffsetInQueue());
+            assertEquals(0L, normalTopicRocksdbCQ.getMaxOffsetInQueue());
+
+            ConsumeQueueInterface retryTopicFileCQ = 
consumeQueueStore.getConsumeQueue(retryTopic, queueId);
+            ConsumeQueueInterface retryTopicRocksdbCQ = 
rocksDBConsumeQueueStore.getConsumeQueue(retryTopic, queueId);
+            assertEquals(1L, retryTopicFileCQ.getMaxOffsetInQueue());
+            assertEquals(0L, retryTopicRocksdbCQ.getMaxOffsetInQueue());
         });
     }
 }

Reply via email to