This is an automated email from the ASF dual-hosted git repository.
lizhimins pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 9e2d877055 [ISSUE #10375] Fix race condition between deleteTopic and
FlushConsumeQueueService by removing getLifeCycle indirection (#10376)
9e2d877055 is described below
commit 9e2d8770558d7037e78a79d4ea52eb3cb0d988aa
Author: Quan <[email protected]>
AuthorDate: Mon May 25 23:13:40 2026 +0800
[ISSUE #10375] Fix race condition between deleteTopic and
FlushConsumeQueueService by removing getLifeCycle indirection (#10376)
---
.../broker/lite/LiteLifecycleManagerTest.java | 7 +-
.../rocketmq/store/queue/ConsumeQueueStore.java | 123 +++++++++++++--------
2 files changed, 80 insertions(+), 50 deletions(-)
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/lite/LiteLifecycleManagerTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/lite/LiteLifecycleManagerTest.java
index 312b8a29fa..00dcb79c8d 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/lite/LiteLifecycleManagerTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/lite/LiteLifecycleManagerTest.java
@@ -18,6 +18,7 @@
package org.apache.rocketmq.broker.lite;
import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
import org.apache.rocketmq.broker.topic.TopicConfigManager;
import org.apache.rocketmq.common.BrokerConfig;
@@ -76,11 +77,16 @@ public class LiteLifecycleManagerTest {
LiteSharding liteSharding = Mockito.mock(LiteSharding.class);
TopicConfigManager topicConfigManager =
Mockito.mock(TopicConfigManager.class);
SubscriptionGroupManager subscriptionGroupManager =
Mockito.mock(SubscriptionGroupManager.class);
+ LiteSubscriptionRegistry liteSubscriptionRegistry =
Mockito.mock(LiteSubscriptionRegistry.class);
+ ConsumerOffsetManager consumerOffsetManager =
Mockito.mock(ConsumerOffsetManager.class);
+ when(consumerOffsetManager.getPullOffsetTable()).thenReturn(new
ConcurrentHashMap<>());
when(brokerController.getBrokerConfig()).thenReturn(BROKER_CONFIG);
when(brokerController.getMessageStore()).thenReturn(messageStore);
when(brokerController.getTopicConfigManager()).thenReturn(topicConfigManager);
when(brokerController.getSubscriptionGroupManager()).thenReturn(subscriptionGroupManager);
+
when(brokerController.getLiteSubscriptionRegistry()).thenReturn(liteSubscriptionRegistry);
+
when(brokerController.getConsumerOffsetManager()).thenReturn(consumerOffsetManager);
when(topicConfigManager.getTopicConfigTable()).thenReturn(TOPIC_CONFIG_TABLE);
when(topicConfigManager.selectTopicConfig(anyString())).thenReturn(mockTopicConfig);
when(subscriptionGroupManager.getSubscriptionGroupTable()).thenReturn(new
ConcurrentHashMap<>());
@@ -177,7 +183,6 @@ public class LiteLifecycleManagerTest {
}
}
- @Ignore("Flaky: fails 2/100 runs (2.0%)")
@Test
public void testCleanByParentTopic() {
int num = 3;
diff --git
a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
index 7a5616bab7..950aa48308 100644
--- a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
@@ -99,7 +99,7 @@ public class ConsumeQueueStore extends
AbstractConsumeQueueStore {
} else {
for (ConcurrentMap<Integer, ConsumeQueueInterface> maps :
this.consumeQueueTable.values()) {
for (ConsumeQueueInterface logic : maps.values()) {
- this.recover(logic);
+ logic.recover();
}
}
}
@@ -111,7 +111,7 @@ public class ConsumeQueueStore extends
AbstractConsumeQueueStore {
* from it.
*/
@Override
- public Long getDispatchFromPhyOffset(boolean recoverNormally) throws
RocksDBException {
+ public Long getDispatchFromPhyOffset(boolean recoverNormally) {
if (recoverNormally) {
return getMaxPhyOffsetInConsumeQueue();
} else {
@@ -127,7 +127,7 @@ public class ConsumeQueueStore extends
AbstractConsumeQueueStore {
public boolean recoverConcurrently() {
int count = 0;
for (ConcurrentMap<Integer, ConsumeQueueInterface> maps :
this.consumeQueueTable.values()) {
- count += maps.values().size();
+ count += maps.size();
}
final CountDownLatch countDownLatch = new CountDownLatch(count);
BlockingQueue<Runnable> recoverQueue = new LinkedBlockingQueue<>();
@@ -206,15 +206,6 @@ public class ConsumeQueueStore extends
AbstractConsumeQueueStore {
return 0;
}
- private FileQueueLifeCycle getLifeCycle(String topic, int queueId) {
- return findOrCreateConsumeQueue(topic, queueId);
- }
-
- public boolean load(ConsumeQueueInterface consumeQueue) {
- FileQueueLifeCycle fileQueueLifeCycle =
getLifeCycle(consumeQueue.getTopic(), consumeQueue.getQueueId());
- return fileQueueLifeCycle.load();
- }
-
private boolean loadConsumeQueues(String storePath, CQType cqType) {
File dirLogic = new File(storePath);
File[] fileTopicList = dirLogic.listFiles();
@@ -237,7 +228,7 @@ public class ConsumeQueueStore extends
AbstractConsumeQueueStore {
ConsumeQueueInterface logic =
createConsumeQueueByType(cqType, topic, queueId, storePath);
this.putConsumeQueue(topic, queueId, logic);
- if (!this.load(logic)) {
+ if (!logic.load()) {
return false;
}
}
@@ -291,11 +282,6 @@ public class ConsumeQueueStore extends
AbstractConsumeQueueStore {
new ThreadFactoryImpl(threadNamePrefix));
}
- public void recover(ConsumeQueueInterface consumeQueue) {
- FileQueueLifeCycle fileQueueLifeCycle =
getLifeCycle(consumeQueue.getTopic(), consumeQueue.getQueueId());
- fileQueueLifeCycle.recover();
- }
-
@Override
public long getMaxPhyOffsetInConsumeQueue() {
long maxPhysicOffset = -1L;
@@ -319,71 +305,110 @@ public class ConsumeQueueStore extends
AbstractConsumeQueueStore {
return -1;
}
- public void checkSelf(ConsumeQueueInterface consumeQueue) {
- FileQueueLifeCycle fileQueueLifeCycle =
getLifeCycle(consumeQueue.getTopic(), consumeQueue.getQueueId());
- fileQueueLifeCycle.checkSelf();
- }
-
@Override
public void checkSelf() {
for (Map.Entry<String, ConcurrentMap<Integer, ConsumeQueueInterface>>
topicEntry : this.consumeQueueTable.entrySet()) {
for (Map.Entry<Integer, ConsumeQueueInterface> cqEntry :
topicEntry.getValue().entrySet()) {
- this.checkSelf(cqEntry.getValue());
+ cqEntry.getValue().checkSelf();
}
}
}
- public boolean flush(ConsumeQueueInterface consumeQueue, int
flushLeastPages) {
- FileQueueLifeCycle fileQueueLifeCycle =
getLifeCycle(consumeQueue.getTopic(), consumeQueue.getQueueId());
- return fileQueueLifeCycle.flush(flushLeastPages);
- }
-
public void flush() throws StoreException {
for (Map.Entry<String, ConcurrentMap<Integer, ConsumeQueueInterface>>
topicEntry : this.consumeQueueTable.entrySet()) {
for (Map.Entry<Integer, ConsumeQueueInterface> cqEntry :
topicEntry.getValue().entrySet()) {
- flush(cqEntry.getValue(), 0);
+ cqEntry.getValue().flush(0);
}
}
}
@Override
public void destroy(ConsumeQueueInterface consumeQueue) {
- FileQueueLifeCycle fileQueueLifeCycle =
getLifeCycle(consumeQueue.getTopic(), consumeQueue.getQueueId());
- fileQueueLifeCycle.destroy();
+ consumeQueue.destroy();
if (MixAll.isLmq(consumeQueue.getTopic())) {
lmqCounter.decrementAndGet();
}
}
+ /**
+ * @deprecated Use {@link ConsumeQueueInterface#load()} directly instead.
+ */
+ @Deprecated
+ public boolean load(ConsumeQueueInterface consumeQueue) {
+ return consumeQueue.load();
+ }
+
+ /**
+ * @deprecated Use {@link ConsumeQueueInterface#recover()} directly
instead.
+ */
+ @Deprecated
+ public void recover(ConsumeQueueInterface consumeQueue) {
+ consumeQueue.recover();
+ }
+
+ /**
+ * @deprecated Use {@link ConsumeQueueInterface#checkSelf()} directly
instead.
+ */
+ @Deprecated
+ public void checkSelf(ConsumeQueueInterface consumeQueue) {
+ consumeQueue.checkSelf();
+ }
+
+ /**
+ * @deprecated Use {@link ConsumeQueueInterface#flush(int)} directly
instead.
+ */
+ @Deprecated
+ public boolean flush(ConsumeQueueInterface consumeQueue, int
flushLeastPages) {
+ return consumeQueue.flush(flushLeastPages);
+ }
+
+ /**
+ * @deprecated Use {@link ConsumeQueueInterface#deleteExpiredFile(long)}
directly instead.
+ */
+ @Deprecated
public int deleteExpiredFile(ConsumeQueueInterface consumeQueue, long
minCommitLogPos) {
- FileQueueLifeCycle fileQueueLifeCycle =
getLifeCycle(consumeQueue.getTopic(), consumeQueue.getQueueId());
- return fileQueueLifeCycle.deleteExpiredFile(minCommitLogPos);
+ return consumeQueue.deleteExpiredFile(minCommitLogPos);
}
+ /**
+ * @deprecated Use {@link
ConsumeQueueInterface#truncateDirtyLogicFiles(long)} directly instead.
+ */
+ @Deprecated
public void truncateDirtyLogicFiles(ConsumeQueueInterface consumeQueue,
long phyOffset) {
- FileQueueLifeCycle fileQueueLifeCycle =
getLifeCycle(consumeQueue.getTopic(), consumeQueue.getQueueId());
- fileQueueLifeCycle.truncateDirtyLogicFiles(phyOffset);
+ consumeQueue.truncateDirtyLogicFiles(phyOffset);
}
+ /**
+ * @deprecated Use {@link ConsumeQueueInterface#swapMap(int, long, long)}
directly instead.
+ */
+ @Deprecated
public void swapMap(ConsumeQueueInterface consumeQueue, int reserveNum,
long forceSwapIntervalMs,
long normalSwapIntervalMs) {
- FileQueueLifeCycle fileQueueLifeCycle =
getLifeCycle(consumeQueue.getTopic(), consumeQueue.getQueueId());
- fileQueueLifeCycle.swapMap(reserveNum, forceSwapIntervalMs,
normalSwapIntervalMs);
+ consumeQueue.swapMap(reserveNum, forceSwapIntervalMs,
normalSwapIntervalMs);
}
+ /**
+ * @deprecated Use {@link ConsumeQueueInterface#cleanSwappedMap(long)}
directly instead.
+ */
+ @Deprecated
public void cleanSwappedMap(ConsumeQueueInterface consumeQueue, long
forceCleanSwapIntervalMs) {
- FileQueueLifeCycle fileQueueLifeCycle =
getLifeCycle(consumeQueue.getTopic(), consumeQueue.getQueueId());
- fileQueueLifeCycle.cleanSwappedMap(forceCleanSwapIntervalMs);
+ consumeQueue.cleanSwappedMap(forceCleanSwapIntervalMs);
}
+ /**
+ * @deprecated Use {@link ConsumeQueueInterface#isFirstFileAvailable()}
directly instead.
+ */
+ @Deprecated
public boolean isFirstFileAvailable(ConsumeQueueInterface consumeQueue) {
- FileQueueLifeCycle fileQueueLifeCycle =
getLifeCycle(consumeQueue.getTopic(), consumeQueue.getQueueId());
- return fileQueueLifeCycle.isFirstFileAvailable();
+ return consumeQueue.isFirstFileAvailable();
}
+ /**
+ * @deprecated Use {@link ConsumeQueueInterface#isFirstFileExist()}
directly instead.
+ */
+ @Deprecated
public boolean isFirstFileExist(ConsumeQueueInterface consumeQueue) {
- FileQueueLifeCycle fileQueueLifeCycle =
getLifeCycle(consumeQueue.getTopic(), consumeQueue.getQueueId());
- return fileQueueLifeCycle.isFirstFileExist();
+ return consumeQueue.isFirstFileExist();
}
@Override
@@ -604,7 +629,7 @@ public class ConsumeQueueStore extends
AbstractConsumeQueueStore {
log.warn("maxPhyOffsetOfConsumeQueue({}) >= processOffset({}),
truncate dirty logic files", maxPhyOffsetOfConsumeQueue, offsetToTruncate);
for (ConcurrentMap<Integer, ConsumeQueueInterface> maps :
this.consumeQueueTable.values()) {
for (ConsumeQueueInterface logic : maps.values()) {
- this.truncateDirtyLogicFiles(logic, offsetToTruncate);
+ logic.truncateDirtyLogicFiles(offsetToTruncate);
}
}
}
@@ -667,7 +692,7 @@ public class ConsumeQueueStore extends
AbstractConsumeQueueStore {
for (ConsumeQueueInterface cq : maps.values()) {
boolean result = false;
for (int i = 0; i < retryTimes && !result; i++) {
- result = flush(cq, flushConsumeQueueLeastPages);
+ result = cq.flush(flushConsumeQueueLeastPages);
}
}
}
@@ -736,7 +761,7 @@ public class ConsumeQueueStore extends
AbstractConsumeQueueStore {
return false;
}
// If first exist and not available, it means first file may
destroy failed, delete it.
- if (isFirstFileExist(logic) && !isFirstFileAvailable(logic)) {
+ if (logic.isFirstFileExist() && !logic.isFirstFileAvailable()) {
log.error("CorrectLogicOffsetService.needCorrect. first file
not available, trigger correct." +
" topic:{}, queue:{}, maxPhyOffset in queue:{},
minPhyOffset " +
"in commit log:{}, minOffset in queue:{}, maxOffset in
queue:{}, cqType:{}"
@@ -821,7 +846,7 @@ public class ConsumeQueueStore extends
AbstractConsumeQueueStore {
}
private void doCorrect(ConsumeQueueInterface logic, long minPhyOffset)
{
- deleteExpiredFile(logic, minPhyOffset);
+ logic.deleteExpiredFile(minPhyOffset);
int sleepIntervalWhenCorrectMinOffset =
messageStoreConfig.getCorrectLogicMinOffsetSleepInterval();
if (sleepIntervalWhenCorrectMinOffset > 0) {
try {
@@ -859,7 +884,7 @@ public class ConsumeQueueStore extends
AbstractConsumeQueueStore {
for (ConcurrentMap<Integer, ConsumeQueueInterface> maps :
consumeQueueTable.values()) {
for (ConsumeQueueInterface logic : maps.values()) {
- int deleteCount = deleteExpiredFile(logic, minOffset);
+ int deleteCount = logic.deleteExpiredFile(minOffset);
if (deleteCount > 0 && deleteLogicsFilesInterval > 0) {
try {
Thread.sleep(deleteLogicsFilesInterval);