This is an automated email from the ASF dual-hosted git repository.
lizhimins pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new cf9113fce9 [ISSUE #10462] Fix resource leaks and lifecycle issues in
tiered storage (#10476)
cf9113fce9 is described below
commit cf9113fce91158e2c9d32f1dc38af687946307d6
Author: lizhimins <[email protected]>
AuthorDate: Thu Jun 11 20:47:54 2026 +0800
[ISSUE #10462] Fix resource leaks and lifecycle issues in tiered storage
(#10476)
---
.../rocketmq/tieredstore/MessageStoreExecutor.java | 56 ++++++++++++----------
.../rocketmq/tieredstore/TieredMessageStore.java | 14 +++---
.../core/MessageStoreDispatcherImpl.java | 4 +-
.../tieredstore/core/MessageStoreFetcherImpl.java | 2 +-
.../rocketmq/tieredstore/file/FlatFileFactory.java | 8 ----
.../rocketmq/tieredstore/file/FlatFileStore.java | 2 +-
.../rocketmq/tieredstore/file/FlatMessageFile.java | 5 --
.../rocketmq/tieredstore/provider/FileSegment.java | 9 ++++
.../tieredstore/provider/PosixFileSegment.java | 22 +++++++--
.../tieredstore/TieredMessageStoreTest.java | 1 +
.../core/MessageStoreDispatcherImplTest.java | 11 +++--
.../tieredstore/file/FlatAppendFileTest.java | 6 ++-
.../tieredstore/file/FlatCommitLogFileTest.java | 6 ++-
.../tieredstore/file/FlatFileFactoryTest.java | 5 +-
.../tieredstore/file/FlatFileStoreTest.java | 1 +
.../tieredstore/file/FlatMessageFileTest.java | 9 +++-
.../index/IndexStoreServiceBenchTest.java | 6 ++-
.../tieredstore/index/IndexStoreServiceTest.java | 6 ++-
18 files changed, 109 insertions(+), 64 deletions(-)
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/MessageStoreExecutor.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/MessageStoreExecutor.java
index d51b5dc420..920504015f 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/MessageStoreExecutor.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/MessageStoreExecutor.java
@@ -23,25 +23,20 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.utils.ThreadUtils;
+import org.apache.rocketmq.tieredstore.util.MessageStoreUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class MessageStoreExecutor {
- public final BlockingQueue<Runnable> bufferCommitThreadPoolQueue;
- public final BlockingQueue<Runnable> bufferFetchThreadPoolQueue;
- public final BlockingQueue<Runnable> fileRecyclingThreadPoolQueue;
+ private static final Logger log =
LoggerFactory.getLogger(MessageStoreUtil.TIERED_STORE_LOGGER_NAME);
- public final ScheduledExecutorService commonExecutor;
- public final ExecutorService bufferCommitExecutor;
- public final ExecutorService bufferFetchExecutor;
- public final ExecutorService fileRecyclingExecutor;
+ private final BlockingQueue<Runnable> bufferCommitThreadPoolQueue;
+ private final BlockingQueue<Runnable> bufferFetchThreadPoolQueue;
- private static class SingletonHolder {
- private static final MessageStoreExecutor INSTANCE = new
MessageStoreExecutor();
- }
-
- public static MessageStoreExecutor getInstance() {
- return SingletonHolder.INSTANCE;
- }
+ private final ScheduledExecutorService commonExecutor;
+ private final ExecutorService bufferCommitExecutor;
+ private final ExecutorService bufferFetchExecutor;
public MessageStoreExecutor() {
this(10000);
@@ -69,19 +64,32 @@ public class MessageStoreExecutor {
TimeUnit.MINUTES.toMillis(1), TimeUnit.MILLISECONDS,
this.bufferFetchThreadPoolQueue,
new ThreadFactoryImpl("BufferFetchExecutor_"));
+ }
- this.fileRecyclingThreadPoolQueue = new
LinkedBlockingQueue<>(maxQueueCapacity);
- this.fileRecyclingExecutor = ThreadUtils.newThreadPoolExecutor(
- Math.max(4, processors),
- Math.max(4, processors),
- TimeUnit.MINUTES.toMillis(1), TimeUnit.MILLISECONDS,
- this.fileRecyclingThreadPoolQueue,
- new ThreadFactoryImpl("BufferFetchExecutor_"));
+ public ScheduledExecutorService getCommonExecutor() {
+ return commonExecutor;
+ }
+
+ public ExecutorService getBufferCommitExecutor() {
+ return bufferCommitExecutor;
+ }
+
+ public ExecutorService getBufferFetchExecutor() {
+ return bufferFetchExecutor;
}
private void shutdownExecutor(ExecutorService executor) {
- if (executor != null) {
- executor.shutdown();
+ if (executor == null) {
+ return;
+ }
+ executor.shutdown();
+ try {
+ if (!executor.awaitTermination(30, TimeUnit.SECONDS)) {
+ executor.shutdownNow();
+ }
+ } catch (InterruptedException e) {
+ executor.shutdownNow();
+ Thread.currentThread().interrupt();
}
}
@@ -89,6 +97,6 @@ public class MessageStoreExecutor {
this.shutdownExecutor(this.commonExecutor);
this.shutdownExecutor(this.bufferCommitExecutor);
this.shutdownExecutor(this.bufferFetchExecutor);
- this.shutdownExecutor(this.fileRecyclingExecutor);
+ log.info("MessageStoreExecutor shutdown complete");
}
}
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
index ed67badf56..2b7c347b3c 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
@@ -112,7 +112,7 @@ public class TieredMessageStore extends
AbstractPluginMessageStore {
if (result) {
indexService.start();
dispatcher.start();
- storeExecutor.commonExecutor.scheduleWithFixedDelay(
+ storeExecutor.getCommonExecutor().scheduleWithFixedDelay(
flatFileStore::scheduleDeleteExpireFile,
storeConfig.getTieredStoreDeleteFileInterval(),
storeConfig.getTieredStoreDeleteFileInterval(),
TimeUnit.MILLISECONDS);
}
@@ -525,12 +525,10 @@ public class TieredMessageStore extends
AbstractPluginMessageStore {
@Override
public synchronized void shutdown() {
- if (next != null) {
- next.shutdown();
- }
if (dispatcher != null) {
dispatcher.shutdown();
}
+
if (indexService != null) {
if (defaultStore.getRunningFlags() != null &&
defaultStore.getRunningFlags().isStoreWriteable()) {
indexService.shutdown();
@@ -538,12 +536,16 @@ public class TieredMessageStore extends
AbstractPluginMessageStore {
indexService.forceShutdown();
}
}
+ if (storeExecutor != null) {
+ storeExecutor.shutdown();
+ }
if (flatFileStore != null) {
flatFileStore.shutdown();
}
- if (storeExecutor != null) {
- storeExecutor.shutdown();
+
+ if (next != null) {
+ next.shutdown();
}
}
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreDispatcherImpl.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreDispatcherImpl.java
index 2a0dfed7a7..c391a1e4d0 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreDispatcherImpl.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreDispatcherImpl.java
@@ -321,7 +321,7 @@ public class MessageStoreDispatcherImpl extends
ServiceThread implements Message
}
}
if (success && repeat) {
- storeExecutor.commonExecutor.submit(() ->
dispatch(flatFile));
+ storeExecutor.getCommonExecutor().submit(() ->
dispatch(flatFile));
}
}
);
@@ -341,7 +341,7 @@ public class MessageStoreDispatcherImpl extends
ServiceThread implements Message
}
public void constructIndexFile(long topicId, GroupCommitContext
groupCommitContext) {
- MessageStoreExecutor.getInstance().bufferCommitExecutor.submit(() -> {
+ storeExecutor.getBufferCommitExecutor().submit(() -> {
if (storeConfig.isMessageIndexEnable()) {
try {
groupCommitContext.getDispatchRequests().forEach(request
-> constructIndexFile0(topicId, request));
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl.java
index 2a5dc2dd8a..43bfcc499d 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl.java
@@ -219,7 +219,7 @@ public class MessageStoreFetcherImpl implements
MessageStoreFetcher {
// this method may trigger an RPC call, causing buffer fetch thread
starvation
return fetchMessageThenPutToCache(flatFile, queueOffset, fetchSize)
.thenApplyAsync(maxOffset -> getMessageFromCache(flatFile,
queueOffset, maxCount, messageFilter),
- messageStore.getStoreExecutor().commonExecutor);
+ messageStore.getStoreExecutor().getCommonExecutor());
}
public CompletableFuture<GetMessageResultExt>
getMessageFromTieredStoreAsync(
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatFileFactory.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatFileFactory.java
index d14ea7ffdb..ba6d028bc8 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatFileFactory.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatFileFactory.java
@@ -17,7 +17,6 @@
package org.apache.rocketmq.tieredstore.file;
-import com.google.common.annotations.VisibleForTesting;
import org.apache.rocketmq.tieredstore.MessageStoreConfig;
import org.apache.rocketmq.tieredstore.MessageStoreExecutor;
import org.apache.rocketmq.tieredstore.common.FileSegmentType;
@@ -30,13 +29,6 @@ public class FlatFileFactory {
private final MessageStoreConfig storeConfig;
private final FileSegmentFactory fileSegmentFactory;
- @VisibleForTesting
- public FlatFileFactory(MetadataStore metadataStore, MessageStoreConfig
storeConfig) {
- this.metadataStore = metadataStore;
- this.storeConfig = storeConfig;
- this.fileSegmentFactory = new FileSegmentFactory(metadataStore,
storeConfig, new MessageStoreExecutor());
- }
-
public FlatFileFactory(MetadataStore metadataStore,
MessageStoreConfig storeConfig, MessageStoreExecutor executor) {
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatFileStore.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatFileStore.java
index 8e33e43c2d..625e9c69fe 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatFileStore.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatFileStore.java
@@ -100,7 +100,7 @@ public class FlatFileStore {
});
log.info("FlatFileStore recover file, topic={}, total={},
cost={}ms",
topicMetadata.getTopic(), queueCount.get(),
stopwatch.elapsed(TimeUnit.MILLISECONDS));
- }, executor.bufferCommitExecutor);
+ }, executor.getBufferCommitExecutor());
}
public void scheduleDeleteExpireFile() {
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatMessageFile.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatMessageFile.java
index 1427c6cef9..413c6af65e 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatMessageFile.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatMessageFile.java
@@ -22,8 +22,6 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -59,8 +57,6 @@ public class FlatMessageFile implements FlatFileInterface {
protected final FlatCommitLogFile commitLog;
protected final FlatConsumeQueueFile consumeQueue;
- protected final ConcurrentMap<String, CompletableFuture<?>>
inFlightRequestMap;
-
public FlatMessageFile(FlatFileFactory fileFactory, String topic, int
queueId) {
this(fileFactory, MessageStoreUtil.toFilePath(
new MessageQueue(topic,
fileFactory.getStoreConfig().getBrokerName(), queueId)));
@@ -75,7 +71,6 @@ public class FlatMessageFile implements FlatFileInterface {
this.metadataStore = fileFactory.getMetadataStore();
this.commitLog = fileFactory.createFlatFileForCommitLog(filePath);
this.consumeQueue =
fileFactory.createFlatFileForConsumeQueue(filePath);
- this.inFlightRequestMap = new ConcurrentHashMap<>();
}
@Override
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/FileSegment.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/FileSegment.java
index 0bedf64452..7235bc1083 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/FileSegment.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/FileSegment.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.rocketmq.tieredstore.MessageStoreConfig;
import org.apache.rocketmq.tieredstore.MessageStoreExecutor;
@@ -153,6 +154,14 @@ public abstract class FileSegment implements
Comparable<FileSegment>, FileSegmen
} finally {
fileLock.unlock();
}
+ CompletableFuture<Boolean> inflight = this.flightCommitRequest;
+ if (inflight != null && !inflight.isDone()) {
+ try {
+ inflight.get(30, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ log.warn("FileSegment close: await in-flight commit timeout,
filePath={}", filePath, e);
+ }
+ }
}
protected List<ByteBuffer> borrowBuffer() {
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/PosixFileSegment.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/PosixFileSegment.java
index 719cd90f8d..70dd4f081b 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/PosixFileSegment.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/PosixFileSegment.java
@@ -58,6 +58,8 @@ public class PosixFileSegment extends FileSegment {
private volatile File file;
private volatile FileChannel readFileChannel;
private volatile FileChannel writeFileChannel;
+ private volatile RandomAccessFile readRandomAccessFile;
+ private volatile RandomAccessFile writeRandomAccessFile;
public PosixFileSegment(MessageStoreConfig storeConfig,
FileSegmentType fileType, String filePath, long baseOffset,
MessageStoreExecutor executor) {
@@ -113,7 +115,7 @@ public class PosixFileSegment extends FileSegment {
}
}
- @SuppressWarnings({"resource", "ResultOfMethodCallIgnored"})
+ @SuppressWarnings("ResultOfMethodCallIgnored")
private void createFile0() {
try {
File file = new File(fullPath);
@@ -126,8 +128,10 @@ public class PosixFileSegment extends FileSegment {
log.debug("PosixFileSegment#createFile0, create file,
filePath={}", fullPath);
}
}
- this.readFileChannel = new RandomAccessFile(file,
"r").getChannel();
- this.writeFileChannel = new RandomAccessFile(file,
"rwd").getChannel();
+ this.readRandomAccessFile = new RandomAccessFile(file, "r");
+ this.readFileChannel = this.readRandomAccessFile.getChannel();
+ this.writeRandomAccessFile = new RandomAccessFile(file, "rwd");
+ this.writeFileChannel = this.writeRandomAccessFile.getChannel();
this.file = file;
} catch (Exception e) {
log.error("PosixFileSegment#createFile0, create file failed,
filePath={}", filePath, e);
@@ -155,10 +159,18 @@ public class PosixFileSegment extends FileSegment {
readFileChannel.close();
readFileChannel = null;
}
+ if (readRandomAccessFile != null) {
+ readRandomAccessFile.close();
+ readRandomAccessFile = null;
+ }
if (writeFileChannel != null && writeFileChannel.isOpen()) {
writeFileChannel.close();
writeFileChannel = null;
}
+ if (writeRandomAccessFile != null) {
+ writeRandomAccessFile.close();
+ writeRandomAccessFile = null;
+ }
} catch (IOException e) {
log.error("PosixFileSegment#close, close failed, filePath={}",
fullPath, e);
}
@@ -192,7 +204,7 @@ public class PosixFileSegment extends FileSegment {
TieredStoreMetricsManager.providerRpcLatency.record(costTime,
attributesBuilder.build());
}
return byteBuffer;
- }, executor.bufferFetchExecutor);
+ }, executor.getBufferFetchExecutor());
}
@Override
@@ -228,6 +240,6 @@ public class PosixFileSegment extends FileSegment {
return false;
}
return true;
- }, executor.bufferCommitExecutor);
+ }, executor.getBufferCommitExecutor());
}
}
diff --git
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java
index f88779f09b..6a3a6a6d5d 100644
---
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java
+++
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java
@@ -84,6 +84,7 @@ public class TieredMessageStoreTest {
Properties properties = new Properties();
properties.setProperty("recordGetMessageResult",
Boolean.TRUE.toString().toLowerCase(Locale.ROOT));
properties.setProperty("tieredBackendServiceProvider",
PosixFileSegment.class.getName());
+ properties.setProperty("tieredStoreFilePath", storePath);
configuration = new Configuration(LoggerFactory.getLogger(
MessageStoreUtil.TIERED_STORE_LOGGER_NAME), storePath +
File.separator + "conf",
diff --git
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/core/MessageStoreDispatcherImplTest.java
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/core/MessageStoreDispatcherImplTest.java
index 2a0b0ad418..15f06d0548 100644
---
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/core/MessageStoreDispatcherImplTest.java
+++
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/core/MessageStoreDispatcherImplTest.java
@@ -89,6 +89,7 @@ public class MessageStoreDispatcherImplTest {
if (messageStore != null) {
messageStore.destroy();
}
+ executor.shutdown();
MessageStoreUtilTest.deleteStoreDirectory(storePath);
}
@@ -100,7 +101,7 @@ public class MessageStoreDispatcherImplTest {
messageStore = Mockito.mock(TieredMessageStore.class);
IndexService indexService =
- new IndexStoreService(new FlatFileFactory(metadataStore,
storeConfig), storePath);
+ new IndexStoreService(new FlatFileFactory(metadataStore,
storeConfig, executor), storePath);
indexService.start();
Mockito.when(messageStore.getDefaultStore()).thenReturn(defaultStore);
Mockito.when(messageStore.getStoreConfig()).thenReturn(storeConfig);
@@ -168,7 +169,7 @@ public class MessageStoreDispatcherImplTest {
messageStore = Mockito.mock(TieredMessageStore.class);
IndexService indexService =
- new IndexStoreService(new FlatFileFactory(metadataStore,
storeConfig), storePath);
+ new IndexStoreService(new FlatFileFactory(metadataStore,
storeConfig, executor), storePath);
indexService.start();
Mockito.when(messageStore.getDefaultStore()).thenReturn(defaultStore);
Mockito.when(messageStore.getStoreConfig()).thenReturn(storeConfig);
@@ -232,7 +233,7 @@ public class MessageStoreDispatcherImplTest {
messageStore = Mockito.mock(TieredMessageStore.class);
IndexService indexService =
- new IndexStoreService(new FlatFileFactory(metadataStore,
storeConfig), storePath);
+ new IndexStoreService(new FlatFileFactory(metadataStore,
storeConfig, executor), storePath);
indexService.start();
Mockito.when(messageStore.getDefaultStore()).thenReturn(defaultStore);
Mockito.when(messageStore.getStoreConfig()).thenReturn(storeConfig);
@@ -289,7 +290,7 @@ public class MessageStoreDispatcherImplTest {
MessageStore defaultStore = Mockito.mock(MessageStore.class);
messageStore = Mockito.mock(TieredMessageStore.class);
IndexService indexService =
- new IndexStoreService(new FlatFileFactory(metadataStore,
storeConfig), storePath);
+ new IndexStoreService(new FlatFileFactory(metadataStore,
storeConfig, executor), storePath);
Mockito.when(messageStore.getDefaultStore()).thenReturn(defaultStore);
Mockito.when(messageStore.getStoreConfig()).thenReturn(storeConfig);
Mockito.when(messageStore.getStoreExecutor()).thenReturn(executor);
@@ -323,7 +324,7 @@ public class MessageStoreDispatcherImplTest {
MessageStore defaultStore = Mockito.mock(MessageStore.class);
messageStore = Mockito.mock(TieredMessageStore.class);
IndexService indexService =
- new IndexStoreService(new FlatFileFactory(metadataStore,
storeConfig), storePath);
+ new IndexStoreService(new FlatFileFactory(metadataStore,
storeConfig, executor), storePath);
Mockito.when(messageStore.getDefaultStore()).thenReturn(defaultStore);
Mockito.when(messageStore.getStoreConfig()).thenReturn(storeConfig);
Mockito.when(messageStore.getStoreExecutor()).thenReturn(executor);
diff --git
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatAppendFileTest.java
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatAppendFileTest.java
index 2e6943728e..ed223fcf9a 100644
---
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatAppendFileTest.java
+++
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatAppendFileTest.java
@@ -22,6 +22,7 @@ import java.util.Arrays;
import java.util.concurrent.CompletionException;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.tieredstore.MessageStoreConfig;
+import org.apache.rocketmq.tieredstore.MessageStoreExecutor;
import org.apache.rocketmq.tieredstore.common.FileSegmentType;
import org.apache.rocketmq.tieredstore.exception.TieredStoreErrorCode;
import org.apache.rocketmq.tieredstore.exception.TieredStoreException;
@@ -43,6 +44,7 @@ public class FlatAppendFileTest {
private MessageQueue queue;
private MetadataStore metadataStore;
private MessageStoreConfig storeConfig;
+ private MessageStoreExecutor executor;
private FlatFileFactory flatFileFactory;
@Before
@@ -56,11 +58,13 @@ public class FlatAppendFileTest {
storeConfig.setTieredStoreConsumeQueueMaxSize(2000L);
queue = new MessageQueue("TieredFlatFileTest",
storeConfig.getBrokerName(), 0);
metadataStore = new DefaultMetadataStore(storeConfig);
- flatFileFactory = new FlatFileFactory(metadataStore, storeConfig);
+ executor = new MessageStoreExecutor();
+ flatFileFactory = new FlatFileFactory(metadataStore, storeConfig,
executor);
}
@After
public void shutdown() throws IOException {
+ executor.shutdown();
MessageStoreUtilTest.deleteStoreDirectory(storePath);
}
diff --git
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatCommitLogFileTest.java
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatCommitLogFileTest.java
index 0fbf5a6a84..d0b82d765e 100644
---
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatCommitLogFileTest.java
+++
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatCommitLogFileTest.java
@@ -21,6 +21,7 @@ import java.nio.ByteBuffer;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.tieredstore.MessageStoreConfig;
+import org.apache.rocketmq.tieredstore.MessageStoreExecutor;
import org.apache.rocketmq.tieredstore.common.AppendResult;
import org.apache.rocketmq.tieredstore.metadata.DefaultMetadataStore;
import org.apache.rocketmq.tieredstore.metadata.MetadataStore;
@@ -40,6 +41,7 @@ public class FlatCommitLogFileTest {
private MessageQueue queue;
private MetadataStore metadataStore;
private MessageStoreConfig storeConfig;
+ private MessageStoreExecutor executor;
private FlatFileFactory flatFileFactory;
@Before
@@ -53,11 +55,13 @@ public class FlatCommitLogFileTest {
storeConfig.setTieredStoreConsumeQueueMaxSize(2000L);
queue = new MessageQueue("TieredFlatFileTest",
storeConfig.getBrokerName(), 0);
metadataStore = new DefaultMetadataStore(storeConfig);
- flatFileFactory = new FlatFileFactory(metadataStore, storeConfig);
+ executor = new MessageStoreExecutor();
+ flatFileFactory = new FlatFileFactory(metadataStore, storeConfig,
executor);
}
@After
public void shutdown() throws IOException {
+ executor.shutdown();
MessageStoreUtilTest.deleteStoreDirectory(storePath);
}
diff --git
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatFileFactoryTest.java
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatFileFactoryTest.java
index bc8ebaf1cb..1c322d24f3 100644
---
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatFileFactoryTest.java
+++
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatFileFactoryTest.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.tieredstore.file;
import org.apache.rocketmq.tieredstore.MessageStoreConfig;
+import org.apache.rocketmq.tieredstore.MessageStoreExecutor;
import org.apache.rocketmq.tieredstore.metadata.DefaultMetadataStore;
import org.apache.rocketmq.tieredstore.metadata.MetadataStore;
import org.apache.rocketmq.tieredstore.util.MessageStoreUtilTest;
@@ -30,7 +31,8 @@ public class FlatFileFactoryTest {
MessageStoreConfig storeConfig = new MessageStoreConfig();
storeConfig.setTieredStoreFilePath(MessageStoreUtilTest.getRandomStorePath());
MetadataStore metadataStore = new DefaultMetadataStore(storeConfig);
- FlatFileFactory factory = new FlatFileFactory(metadataStore,
storeConfig);
+ MessageStoreExecutor executor = new MessageStoreExecutor();
+ FlatFileFactory factory = new FlatFileFactory(metadataStore,
storeConfig, executor);
Assert.assertEquals(storeConfig, factory.getStoreConfig());
Assert.assertEquals(metadataStore, factory.getMetadataStore());
@@ -45,5 +47,6 @@ public class FlatFileFactoryTest {
flatFile1.destroy();
flatFile2.destroy();
flatFile3.destroy();
+ executor.shutdown();
}
}
\ No newline at end of file
diff --git
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatFileStoreTest.java
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatFileStoreTest.java
index 2a007af4e9..0005d825d5 100644
---
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatFileStoreTest.java
+++
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatFileStoreTest.java
@@ -45,6 +45,7 @@ public class FlatFileStoreTest {
public void init() {
storeConfig = new MessageStoreConfig();
storeConfig.setStorePathRootDir(storePath);
+ storeConfig.setTieredStoreFilePath(storePath);
storeConfig.setTieredBackendServiceProvider(PosixFileSegment.class.getName());
storeConfig.setBrokerName("brokerName");
metadataStore = new DefaultMetadataStore(storeConfig);
diff --git
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatMessageFileTest.java
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatMessageFileTest.java
index 97768d0658..0ddaa7ce4f 100644
---
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatMessageFileTest.java
+++
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatMessageFileTest.java
@@ -23,6 +23,7 @@ import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.store.ConsumeQueue;
import org.apache.rocketmq.store.DispatchRequest;
import org.apache.rocketmq.tieredstore.MessageStoreConfig;
+import org.apache.rocketmq.tieredstore.MessageStoreExecutor;
import org.apache.rocketmq.tieredstore.common.AppendResult;
import org.apache.rocketmq.tieredstore.metadata.DefaultMetadataStore;
import org.apache.rocketmq.tieredstore.metadata.MetadataStore;
@@ -41,6 +42,7 @@ public class FlatMessageFileTest {
private final String storePath = MessageStoreUtilTest.getRandomStorePath();
private MessageStoreConfig storeConfig;
private MetadataStore metadataStore;
+ private MessageStoreExecutor executor;
private FlatFileFactory flatFileFactory;
@Before
@@ -48,15 +50,18 @@ public class FlatMessageFileTest {
storeConfig = new MessageStoreConfig();
storeConfig.setBrokerName("brokerName");
storeConfig.setStorePathRootDir(storePath);
+ storeConfig.setTieredStoreFilePath(storePath);
storeConfig.setTieredBackendServiceProvider(PosixFileSegment.class.getName());
storeConfig.setCommitLogRollingInterval(0);
storeConfig.setCommitLogRollingMinimumSize(999);
metadataStore = new DefaultMetadataStore(storeConfig);
- flatFileFactory = new FlatFileFactory(metadataStore, storeConfig);
+ executor = new MessageStoreExecutor();
+ flatFileFactory = new FlatFileFactory(metadataStore, storeConfig,
executor);
}
@After
public void shutdown() throws IOException {
+ executor.shutdown();
MessageStoreUtilTest.deleteStoreDirectory(storePath);
}
@@ -142,7 +147,7 @@ public class FlatMessageFileTest {
// replace provider, need new factory again
storeConfig.setTieredBackendServiceProvider(PosixFileSegment.class.getName());
- flatFileFactory = new FlatFileFactory(metadataStore, storeConfig);
+ flatFileFactory = new FlatFileFactory(metadataStore, storeConfig,
executor);
// inject store time: 0, +100, +100, +100, +200
MessageQueue mq = new MessageQueue("TopicTest", "BrokerName", 1);
diff --git
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreServiceBenchTest.java
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreServiceBenchTest.java
index fcb28402ea..5a579bef9e 100644
---
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreServiceBenchTest.java
+++
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreServiceBenchTest.java
@@ -28,6 +28,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.tieredstore.MessageStoreConfig;
+import org.apache.rocketmq.tieredstore.MessageStoreExecutor;
import org.apache.rocketmq.tieredstore.common.AppendResult;
import org.apache.rocketmq.tieredstore.file.FlatFileFactory;
import org.apache.rocketmq.tieredstore.metadata.DefaultMetadataStore;
@@ -61,6 +62,7 @@ public class IndexStoreServiceBenchTest {
private static final Logger log =
LoggerFactory.getLogger(MessageStoreUtil.TIERED_STORE_LOGGER_NAME);
private static final String TOPIC_NAME = "TopicTest";
private MessageStoreConfig storeConfig;
+ private MessageStoreExecutor executor;
private IndexStoreService indexStoreService;
private final LongAdder failureCount = new LongAdder();
@@ -77,7 +79,8 @@ public class IndexStoreServiceBenchTest {
storeConfig.setTieredStoreIndexFileMaxHashSlotNum(500 * 1000);
storeConfig.setTieredStoreIndexFileMaxIndexNum(2000 * 1000);
MetadataStore metadataStore = new DefaultMetadataStore(storeConfig);
- FlatFileFactory flatFileFactory = new FlatFileFactory(metadataStore,
storeConfig);
+ executor = new MessageStoreExecutor();
+ FlatFileFactory flatFileFactory = new FlatFileFactory(metadataStore,
storeConfig, executor);
indexStoreService = new IndexStoreService(flatFileFactory, storePath);
indexStoreService.start();
}
@@ -86,6 +89,7 @@ public class IndexStoreServiceBenchTest {
public void shutdown() throws IOException {
indexStoreService.shutdown();
indexStoreService.destroy();
+ executor.shutdown();
}
//@Benchmark
diff --git
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreServiceTest.java
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreServiceTest.java
index 4736d5f585..0d849d5927 100644
---
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreServiceTest.java
+++
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreServiceTest.java
@@ -37,6 +37,7 @@ import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.store.logfile.DefaultMappedFile;
import org.apache.rocketmq.tieredstore.MessageStoreConfig;
+import org.apache.rocketmq.tieredstore.MessageStoreExecutor;
import org.apache.rocketmq.tieredstore.common.AppendResult;
import org.apache.rocketmq.tieredstore.file.FlatAppendFile;
import org.apache.rocketmq.tieredstore.file.FlatFileFactory;
@@ -67,6 +68,7 @@ public class IndexStoreServiceTest {
private String filePath;
private MessageStoreConfig storeConfig;
+ private MessageStoreExecutor executor;
private FlatFileFactory fileAllocator;
private IndexStoreService indexService;
@@ -81,7 +83,8 @@ public class IndexStoreServiceTest {
storeConfig.setTieredStoreIndexFileMaxIndexNum(20);
storeConfig.setTieredBackendServiceProvider("org.apache.rocketmq.tieredstore.provider.PosixFileSegment");
MetadataStore metadataStore = new DefaultMetadataStore(storeConfig);
- fileAllocator = new FlatFileFactory(metadataStore, storeConfig);
+ executor = new MessageStoreExecutor();
+ fileAllocator = new FlatFileFactory(metadataStore, storeConfig,
executor);
}
@After
@@ -90,6 +93,7 @@ public class IndexStoreServiceTest {
indexService.shutdown();
indexService.destroy();
}
+ executor.shutdown();
MessageStoreUtilTest.deleteStoreDirectory(storeConfig.getTieredStoreFilePath());
}