This is an automated email from the ASF dual-hosted git repository.

lizhimins pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 9e2d877055 [ISSUE #10375] Fix race condition between deleteTopic and 
FlushConsumeQueueService by removing getLifeCycle indirection (#10376)
9e2d877055 is described below

commit 9e2d8770558d7037e78a79d4ea52eb3cb0d988aa
Author: Quan <[email protected]>
AuthorDate: Mon May 25 23:13:40 2026 +0800

    [ISSUE #10375] Fix race condition between deleteTopic and 
FlushConsumeQueueService by removing getLifeCycle indirection (#10376)
---
 .../broker/lite/LiteLifecycleManagerTest.java      |   7 +-
 .../rocketmq/store/queue/ConsumeQueueStore.java    | 123 +++++++++++++--------
 2 files changed, 80 insertions(+), 50 deletions(-)

diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/lite/LiteLifecycleManagerTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/lite/LiteLifecycleManagerTest.java
index 312b8a29fa..00dcb79c8d 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/lite/LiteLifecycleManagerTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/lite/LiteLifecycleManagerTest.java
@@ -18,6 +18,7 @@
 package org.apache.rocketmq.broker.lite;
 
 import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
 import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
 import org.apache.rocketmq.broker.topic.TopicConfigManager;
 import org.apache.rocketmq.common.BrokerConfig;
@@ -76,11 +77,16 @@ public class LiteLifecycleManagerTest {
         LiteSharding liteSharding = Mockito.mock(LiteSharding.class);
         TopicConfigManager topicConfigManager = 
Mockito.mock(TopicConfigManager.class);
         SubscriptionGroupManager subscriptionGroupManager = 
Mockito.mock(SubscriptionGroupManager.class);
+        LiteSubscriptionRegistry liteSubscriptionRegistry = 
Mockito.mock(LiteSubscriptionRegistry.class);
+        ConsumerOffsetManager consumerOffsetManager = 
Mockito.mock(ConsumerOffsetManager.class);
+        when(consumerOffsetManager.getPullOffsetTable()).thenReturn(new 
ConcurrentHashMap<>());
 
         when(brokerController.getBrokerConfig()).thenReturn(BROKER_CONFIG);
         when(brokerController.getMessageStore()).thenReturn(messageStore);
         
when(brokerController.getTopicConfigManager()).thenReturn(topicConfigManager);
         
when(brokerController.getSubscriptionGroupManager()).thenReturn(subscriptionGroupManager);
+        
when(brokerController.getLiteSubscriptionRegistry()).thenReturn(liteSubscriptionRegistry);
+        
when(brokerController.getConsumerOffsetManager()).thenReturn(consumerOffsetManager);
         
when(topicConfigManager.getTopicConfigTable()).thenReturn(TOPIC_CONFIG_TABLE);
         
when(topicConfigManager.selectTopicConfig(anyString())).thenReturn(mockTopicConfig);
         
when(subscriptionGroupManager.getSubscriptionGroupTable()).thenReturn(new 
ConcurrentHashMap<>());
@@ -177,7 +183,6 @@ public class LiteLifecycleManagerTest {
         }
     }
 
-    @Ignore("Flaky: fails 2/100 runs (2.0%)")
     @Test
     public void testCleanByParentTopic() {
         int num = 3;
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java 
b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
index 7a5616bab7..950aa48308 100644
--- a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
@@ -99,7 +99,7 @@ public class ConsumeQueueStore extends 
AbstractConsumeQueueStore {
         } else {
             for (ConcurrentMap<Integer, ConsumeQueueInterface> maps : 
this.consumeQueueTable.values()) {
                 for (ConsumeQueueInterface logic : maps.values()) {
-                    this.recover(logic);
+                    logic.recover();
                 }
             }
         }
@@ -111,7 +111,7 @@ public class ConsumeQueueStore extends 
AbstractConsumeQueueStore {
      * from it.
      */
     @Override
-    public Long getDispatchFromPhyOffset(boolean recoverNormally) throws 
RocksDBException {
+    public Long getDispatchFromPhyOffset(boolean recoverNormally) {
         if (recoverNormally) {
             return getMaxPhyOffsetInConsumeQueue();
         } else {
@@ -127,7 +127,7 @@ public class ConsumeQueueStore extends 
AbstractConsumeQueueStore {
     public boolean recoverConcurrently() {
         int count = 0;
         for (ConcurrentMap<Integer, ConsumeQueueInterface> maps : 
this.consumeQueueTable.values()) {
-            count += maps.values().size();
+            count += maps.size();
         }
         final CountDownLatch countDownLatch = new CountDownLatch(count);
         BlockingQueue<Runnable> recoverQueue = new LinkedBlockingQueue<>();
@@ -206,15 +206,6 @@ public class ConsumeQueueStore extends 
AbstractConsumeQueueStore {
         return 0;
     }
 
-    private FileQueueLifeCycle getLifeCycle(String topic, int queueId) {
-        return findOrCreateConsumeQueue(topic, queueId);
-    }
-
-    public boolean load(ConsumeQueueInterface consumeQueue) {
-        FileQueueLifeCycle fileQueueLifeCycle = 
getLifeCycle(consumeQueue.getTopic(), consumeQueue.getQueueId());
-        return fileQueueLifeCycle.load();
-    }
-
     private boolean loadConsumeQueues(String storePath, CQType cqType) {
         File dirLogic = new File(storePath);
         File[] fileTopicList = dirLogic.listFiles();
@@ -237,7 +228,7 @@ public class ConsumeQueueStore extends 
AbstractConsumeQueueStore {
 
                         ConsumeQueueInterface logic = 
createConsumeQueueByType(cqType, topic, queueId, storePath);
                         this.putConsumeQueue(topic, queueId, logic);
-                        if (!this.load(logic)) {
+                        if (!logic.load()) {
                             return false;
                         }
                     }
@@ -291,11 +282,6 @@ public class ConsumeQueueStore extends 
AbstractConsumeQueueStore {
             new ThreadFactoryImpl(threadNamePrefix));
     }
 
-    public void recover(ConsumeQueueInterface consumeQueue) {
-        FileQueueLifeCycle fileQueueLifeCycle = 
getLifeCycle(consumeQueue.getTopic(), consumeQueue.getQueueId());
-        fileQueueLifeCycle.recover();
-    }
-
     @Override
     public long getMaxPhyOffsetInConsumeQueue() {
         long maxPhysicOffset = -1L;
@@ -319,71 +305,110 @@ public class ConsumeQueueStore extends 
AbstractConsumeQueueStore {
         return -1;
     }
 
-    public void checkSelf(ConsumeQueueInterface consumeQueue) {
-        FileQueueLifeCycle fileQueueLifeCycle = 
getLifeCycle(consumeQueue.getTopic(), consumeQueue.getQueueId());
-        fileQueueLifeCycle.checkSelf();
-    }
-
     @Override
     public void checkSelf() {
         for (Map.Entry<String, ConcurrentMap<Integer, ConsumeQueueInterface>> 
topicEntry : this.consumeQueueTable.entrySet()) {
             for (Map.Entry<Integer, ConsumeQueueInterface> cqEntry : 
topicEntry.getValue().entrySet()) {
-                this.checkSelf(cqEntry.getValue());
+                cqEntry.getValue().checkSelf();
             }
         }
     }
 
-    public boolean flush(ConsumeQueueInterface consumeQueue, int 
flushLeastPages) {
-        FileQueueLifeCycle fileQueueLifeCycle = 
getLifeCycle(consumeQueue.getTopic(), consumeQueue.getQueueId());
-        return fileQueueLifeCycle.flush(flushLeastPages);
-    }
-
     public void flush() throws StoreException {
         for (Map.Entry<String, ConcurrentMap<Integer, ConsumeQueueInterface>> 
topicEntry : this.consumeQueueTable.entrySet()) {
             for (Map.Entry<Integer, ConsumeQueueInterface> cqEntry : 
topicEntry.getValue().entrySet()) {
-                flush(cqEntry.getValue(), 0);
+                cqEntry.getValue().flush(0);
             }
         }
     }
 
     @Override
     public void destroy(ConsumeQueueInterface consumeQueue) {
-        FileQueueLifeCycle fileQueueLifeCycle = 
getLifeCycle(consumeQueue.getTopic(), consumeQueue.getQueueId());
-        fileQueueLifeCycle.destroy();
+        consumeQueue.destroy();
         if (MixAll.isLmq(consumeQueue.getTopic())) {
             lmqCounter.decrementAndGet();
         }
     }
 
+    /**
+     * @deprecated Use {@link ConsumeQueueInterface#load()} directly instead.
+     */
+    @Deprecated
+    public boolean load(ConsumeQueueInterface consumeQueue) {
+        return consumeQueue.load();
+    }
+
+    /**
+     * @deprecated Use {@link ConsumeQueueInterface#recover()} directly 
instead.
+     */
+    @Deprecated
+    public void recover(ConsumeQueueInterface consumeQueue) {
+        consumeQueue.recover();
+    }
+
+    /**
+     * @deprecated Use {@link ConsumeQueueInterface#checkSelf()} directly 
instead.
+     */
+    @Deprecated
+    public void checkSelf(ConsumeQueueInterface consumeQueue) {
+        consumeQueue.checkSelf();
+    }
+
+    /**
+     * @deprecated Use {@link ConsumeQueueInterface#flush(int)} directly 
instead.
+     */
+    @Deprecated
+    public boolean flush(ConsumeQueueInterface consumeQueue, int 
flushLeastPages) {
+        return consumeQueue.flush(flushLeastPages);
+    }
+
+    /**
+     * @deprecated Use {@link ConsumeQueueInterface#deleteExpiredFile(long)} 
directly instead.
+     */
+    @Deprecated
     public int deleteExpiredFile(ConsumeQueueInterface consumeQueue, long 
minCommitLogPos) {
-        FileQueueLifeCycle fileQueueLifeCycle = 
getLifeCycle(consumeQueue.getTopic(), consumeQueue.getQueueId());
-        return fileQueueLifeCycle.deleteExpiredFile(minCommitLogPos);
+        return consumeQueue.deleteExpiredFile(minCommitLogPos);
     }
 
+    /**
+     * @deprecated Use {@link 
ConsumeQueueInterface#truncateDirtyLogicFiles(long)} directly instead.
+     */
+    @Deprecated
     public void truncateDirtyLogicFiles(ConsumeQueueInterface consumeQueue, 
long phyOffset) {
-        FileQueueLifeCycle fileQueueLifeCycle = 
getLifeCycle(consumeQueue.getTopic(), consumeQueue.getQueueId());
-        fileQueueLifeCycle.truncateDirtyLogicFiles(phyOffset);
+        consumeQueue.truncateDirtyLogicFiles(phyOffset);
     }
 
+    /**
+     * @deprecated Use {@link ConsumeQueueInterface#swapMap(int, long, long)} 
directly instead.
+     */
+    @Deprecated
     public void swapMap(ConsumeQueueInterface consumeQueue, int reserveNum, 
long forceSwapIntervalMs,
         long normalSwapIntervalMs) {
-        FileQueueLifeCycle fileQueueLifeCycle = 
getLifeCycle(consumeQueue.getTopic(), consumeQueue.getQueueId());
-        fileQueueLifeCycle.swapMap(reserveNum, forceSwapIntervalMs, 
normalSwapIntervalMs);
+        consumeQueue.swapMap(reserveNum, forceSwapIntervalMs, 
normalSwapIntervalMs);
     }
 
+    /**
+     * @deprecated Use {@link ConsumeQueueInterface#cleanSwappedMap(long)} 
directly instead.
+     */
+    @Deprecated
     public void cleanSwappedMap(ConsumeQueueInterface consumeQueue, long 
forceCleanSwapIntervalMs) {
-        FileQueueLifeCycle fileQueueLifeCycle = 
getLifeCycle(consumeQueue.getTopic(), consumeQueue.getQueueId());
-        fileQueueLifeCycle.cleanSwappedMap(forceCleanSwapIntervalMs);
+        consumeQueue.cleanSwappedMap(forceCleanSwapIntervalMs);
     }
 
+    /**
+     * @deprecated Use {@link ConsumeQueueInterface#isFirstFileAvailable()} 
directly instead.
+     */
+    @Deprecated
     public boolean isFirstFileAvailable(ConsumeQueueInterface consumeQueue) {
-        FileQueueLifeCycle fileQueueLifeCycle = 
getLifeCycle(consumeQueue.getTopic(), consumeQueue.getQueueId());
-        return fileQueueLifeCycle.isFirstFileAvailable();
+        return consumeQueue.isFirstFileAvailable();
     }
 
+    /**
+     * @deprecated Use {@link ConsumeQueueInterface#isFirstFileExist()} 
directly instead.
+     */
+    @Deprecated
     public boolean isFirstFileExist(ConsumeQueueInterface consumeQueue) {
-        FileQueueLifeCycle fileQueueLifeCycle = 
getLifeCycle(consumeQueue.getTopic(), consumeQueue.getQueueId());
-        return fileQueueLifeCycle.isFirstFileExist();
+        return consumeQueue.isFirstFileExist();
     }
 
     @Override
@@ -604,7 +629,7 @@ public class ConsumeQueueStore extends 
AbstractConsumeQueueStore {
             log.warn("maxPhyOffsetOfConsumeQueue({}) >= processOffset({}), 
truncate dirty logic files", maxPhyOffsetOfConsumeQueue, offsetToTruncate);
             for (ConcurrentMap<Integer, ConsumeQueueInterface> maps : 
this.consumeQueueTable.values()) {
                 for (ConsumeQueueInterface logic : maps.values()) {
-                    this.truncateDirtyLogicFiles(logic, offsetToTruncate);
+                    logic.truncateDirtyLogicFiles(offsetToTruncate);
                 }
             }
         }
@@ -667,7 +692,7 @@ public class ConsumeQueueStore extends 
AbstractConsumeQueueStore {
                 for (ConsumeQueueInterface cq : maps.values()) {
                     boolean result = false;
                     for (int i = 0; i < retryTimes && !result; i++) {
-                        result = flush(cq, flushConsumeQueueLeastPages);
+                        result = cq.flush(flushConsumeQueueLeastPages);
                     }
                 }
             }
@@ -736,7 +761,7 @@ public class ConsumeQueueStore extends 
AbstractConsumeQueueStore {
                 return false;
             }
             // If first exist and not available, it means first file may 
destroy failed, delete it.
-            if (isFirstFileExist(logic) && !isFirstFileAvailable(logic)) {
+            if (logic.isFirstFileExist() && !logic.isFirstFileAvailable()) {
                 log.error("CorrectLogicOffsetService.needCorrect. first file 
not available, trigger correct." +
                         " topic:{}, queue:{}, maxPhyOffset in queue:{}, 
minPhyOffset " +
                         "in commit log:{}, minOffset in queue:{}, maxOffset in 
queue:{}, cqType:{}"
@@ -821,7 +846,7 @@ public class ConsumeQueueStore extends 
AbstractConsumeQueueStore {
         }
 
         private void doCorrect(ConsumeQueueInterface logic, long minPhyOffset) 
{
-            deleteExpiredFile(logic, minPhyOffset);
+            logic.deleteExpiredFile(minPhyOffset);
             int sleepIntervalWhenCorrectMinOffset = 
messageStoreConfig.getCorrectLogicMinOffsetSleepInterval();
             if (sleepIntervalWhenCorrectMinOffset > 0) {
                 try {
@@ -859,7 +884,7 @@ public class ConsumeQueueStore extends 
AbstractConsumeQueueStore {
 
                 for (ConcurrentMap<Integer, ConsumeQueueInterface> maps : 
consumeQueueTable.values()) {
                     for (ConsumeQueueInterface logic : maps.values()) {
-                        int deleteCount = deleteExpiredFile(logic, minOffset);
+                        int deleteCount = logic.deleteExpiredFile(minOffset);
                         if (deleteCount > 0 && deleteLogicsFilesInterval > 0) {
                             try {
                                 Thread.sleep(deleteLogicsFilesInterval);

Reply via email to