This is an automated email from the ASF dual-hosted git repository.

lizhimins pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new cf9113fce9 [ISSUE #10462] Fix resource leaks and lifecycle issues in 
tiered storage (#10476)
cf9113fce9 is described below

commit cf9113fce91158e2c9d32f1dc38af687946307d6
Author: lizhimins <[email protected]>
AuthorDate: Thu Jun 11 20:47:54 2026 +0800

    [ISSUE #10462] Fix resource leaks and lifecycle issues in tiered storage 
(#10476)
---
 .../rocketmq/tieredstore/MessageStoreExecutor.java | 56 ++++++++++++----------
 .../rocketmq/tieredstore/TieredMessageStore.java   | 14 +++---
 .../core/MessageStoreDispatcherImpl.java           |  4 +-
 .../tieredstore/core/MessageStoreFetcherImpl.java  |  2 +-
 .../rocketmq/tieredstore/file/FlatFileFactory.java |  8 ----
 .../rocketmq/tieredstore/file/FlatFileStore.java   |  2 +-
 .../rocketmq/tieredstore/file/FlatMessageFile.java |  5 --
 .../rocketmq/tieredstore/provider/FileSegment.java |  9 ++++
 .../tieredstore/provider/PosixFileSegment.java     | 22 +++++++--
 .../tieredstore/TieredMessageStoreTest.java        |  1 +
 .../core/MessageStoreDispatcherImplTest.java       | 11 +++--
 .../tieredstore/file/FlatAppendFileTest.java       |  6 ++-
 .../tieredstore/file/FlatCommitLogFileTest.java    |  6 ++-
 .../tieredstore/file/FlatFileFactoryTest.java      |  5 +-
 .../tieredstore/file/FlatFileStoreTest.java        |  1 +
 .../tieredstore/file/FlatMessageFileTest.java      |  9 +++-
 .../index/IndexStoreServiceBenchTest.java          |  6 ++-
 .../tieredstore/index/IndexStoreServiceTest.java   |  6 ++-
 18 files changed, 109 insertions(+), 64 deletions(-)

diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/MessageStoreExecutor.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/MessageStoreExecutor.java
index d51b5dc420..920504015f 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/MessageStoreExecutor.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/MessageStoreExecutor.java
@@ -23,25 +23,20 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.common.ThreadFactoryImpl;
 import org.apache.rocketmq.common.utils.ThreadUtils;
+import org.apache.rocketmq.tieredstore.util.MessageStoreUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class MessageStoreExecutor {
 
-    public final BlockingQueue<Runnable> bufferCommitThreadPoolQueue;
-    public final BlockingQueue<Runnable> bufferFetchThreadPoolQueue;
-    public final BlockingQueue<Runnable> fileRecyclingThreadPoolQueue;
+    private static final Logger log = 
LoggerFactory.getLogger(MessageStoreUtil.TIERED_STORE_LOGGER_NAME);
 
-    public final ScheduledExecutorService commonExecutor;
-    public final ExecutorService bufferCommitExecutor;
-    public final ExecutorService bufferFetchExecutor;
-    public final ExecutorService fileRecyclingExecutor;
+    private final BlockingQueue<Runnable> bufferCommitThreadPoolQueue;
+    private final BlockingQueue<Runnable> bufferFetchThreadPoolQueue;
 
-    private static class SingletonHolder {
-        private static final MessageStoreExecutor INSTANCE = new 
MessageStoreExecutor();
-    }
-
-    public static MessageStoreExecutor getInstance() {
-        return SingletonHolder.INSTANCE;
-    }
+    private final ScheduledExecutorService commonExecutor;
+    private final ExecutorService bufferCommitExecutor;
+    private final ExecutorService bufferFetchExecutor;
 
     public MessageStoreExecutor() {
         this(10000);
@@ -69,19 +64,32 @@ public class MessageStoreExecutor {
             TimeUnit.MINUTES.toMillis(1), TimeUnit.MILLISECONDS,
             this.bufferFetchThreadPoolQueue,
             new ThreadFactoryImpl("BufferFetchExecutor_"));
+    }
 
-        this.fileRecyclingThreadPoolQueue = new 
LinkedBlockingQueue<>(maxQueueCapacity);
-        this.fileRecyclingExecutor = ThreadUtils.newThreadPoolExecutor(
-            Math.max(4, processors),
-            Math.max(4, processors),
-            TimeUnit.MINUTES.toMillis(1), TimeUnit.MILLISECONDS,
-            this.fileRecyclingThreadPoolQueue,
-            new ThreadFactoryImpl("BufferFetchExecutor_"));
+    public ScheduledExecutorService getCommonExecutor() {
+        return commonExecutor;
+    }
+
+    public ExecutorService getBufferCommitExecutor() {
+        return bufferCommitExecutor;
+    }
+
+    public ExecutorService getBufferFetchExecutor() {
+        return bufferFetchExecutor;
     }
 
     private void shutdownExecutor(ExecutorService executor) {
-        if (executor != null) {
-            executor.shutdown();
+        if (executor == null) {
+            return;
+        }
+        executor.shutdown();
+        try {
+            if (!executor.awaitTermination(30, TimeUnit.SECONDS)) {
+                executor.shutdownNow();
+            }
+        } catch (InterruptedException e) {
+            executor.shutdownNow();
+            Thread.currentThread().interrupt();
         }
     }
 
@@ -89,6 +97,6 @@ public class MessageStoreExecutor {
         this.shutdownExecutor(this.commonExecutor);
         this.shutdownExecutor(this.bufferCommitExecutor);
         this.shutdownExecutor(this.bufferFetchExecutor);
-        this.shutdownExecutor(this.fileRecyclingExecutor);
+        log.info("MessageStoreExecutor shutdown complete");
     }
 }
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
index ed67badf56..2b7c347b3c 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
@@ -112,7 +112,7 @@ public class TieredMessageStore extends 
AbstractPluginMessageStore {
         if (result) {
             indexService.start();
             dispatcher.start();
-            storeExecutor.commonExecutor.scheduleWithFixedDelay(
+            storeExecutor.getCommonExecutor().scheduleWithFixedDelay(
                 flatFileStore::scheduleDeleteExpireFile, 
storeConfig.getTieredStoreDeleteFileInterval(),
                 storeConfig.getTieredStoreDeleteFileInterval(), 
TimeUnit.MILLISECONDS);
         }
@@ -525,12 +525,10 @@ public class TieredMessageStore extends 
AbstractPluginMessageStore {
 
     @Override
     public synchronized void shutdown() {
-        if (next != null) {
-            next.shutdown();
-        }
         if (dispatcher != null) {
             dispatcher.shutdown();
         }
+
         if (indexService != null) {
             if (defaultStore.getRunningFlags() != null && 
defaultStore.getRunningFlags().isStoreWriteable()) {
                 indexService.shutdown();
@@ -538,12 +536,16 @@ public class TieredMessageStore extends 
AbstractPluginMessageStore {
                 indexService.forceShutdown();
             }
         }
+        if (storeExecutor != null) {
+            storeExecutor.shutdown();
+        }
 
         if (flatFileStore != null) {
             flatFileStore.shutdown();
         }
-        if (storeExecutor != null) {
-            storeExecutor.shutdown();
+
+        if (next != null) {
+            next.shutdown();
         }
     }
 
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreDispatcherImpl.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreDispatcherImpl.java
index 2a0dfed7a7..c391a1e4d0 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreDispatcherImpl.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreDispatcherImpl.java
@@ -321,7 +321,7 @@ public class MessageStoreDispatcherImpl extends 
ServiceThread implements Message
                             }
                         }
                         if (success && repeat) {
-                            storeExecutor.commonExecutor.submit(() -> 
dispatch(flatFile));
+                            storeExecutor.getCommonExecutor().submit(() -> 
dispatch(flatFile));
                         }
                     }
                 );
@@ -341,7 +341,7 @@ public class MessageStoreDispatcherImpl extends 
ServiceThread implements Message
     }
 
     public void constructIndexFile(long topicId, GroupCommitContext 
groupCommitContext) {
-        MessageStoreExecutor.getInstance().bufferCommitExecutor.submit(() -> {
+        storeExecutor.getBufferCommitExecutor().submit(() -> {
             if (storeConfig.isMessageIndexEnable()) {
                 try {
                     groupCommitContext.getDispatchRequests().forEach(request 
-> constructIndexFile0(topicId, request));
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl.java
index 2a5dc2dd8a..43bfcc499d 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl.java
@@ -219,7 +219,7 @@ public class MessageStoreFetcherImpl implements 
MessageStoreFetcher {
         // this method may trigger an RPC call, causing buffer fetch thread 
starvation
         return fetchMessageThenPutToCache(flatFile, queueOffset, fetchSize)
             .thenApplyAsync(maxOffset -> getMessageFromCache(flatFile, 
queueOffset, maxCount, messageFilter),
-                messageStore.getStoreExecutor().commonExecutor);
+                messageStore.getStoreExecutor().getCommonExecutor());
     }
 
     public CompletableFuture<GetMessageResultExt> 
getMessageFromTieredStoreAsync(
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatFileFactory.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatFileFactory.java
index d14ea7ffdb..ba6d028bc8 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatFileFactory.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatFileFactory.java
@@ -17,7 +17,6 @@
 
 package org.apache.rocketmq.tieredstore.file;
 
-import com.google.common.annotations.VisibleForTesting;
 import org.apache.rocketmq.tieredstore.MessageStoreConfig;
 import org.apache.rocketmq.tieredstore.MessageStoreExecutor;
 import org.apache.rocketmq.tieredstore.common.FileSegmentType;
@@ -30,13 +29,6 @@ public class FlatFileFactory {
     private final MessageStoreConfig storeConfig;
     private final FileSegmentFactory fileSegmentFactory;
 
-    @VisibleForTesting
-    public FlatFileFactory(MetadataStore metadataStore, MessageStoreConfig 
storeConfig) {
-        this.metadataStore = metadataStore;
-        this.storeConfig = storeConfig;
-        this.fileSegmentFactory = new FileSegmentFactory(metadataStore, 
storeConfig, new MessageStoreExecutor());
-    }
-
     public FlatFileFactory(MetadataStore metadataStore,
         MessageStoreConfig storeConfig, MessageStoreExecutor executor) {
 
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatFileStore.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatFileStore.java
index 8e33e43c2d..625e9c69fe 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatFileStore.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatFileStore.java
@@ -100,7 +100,7 @@ public class FlatFileStore {
             });
             log.info("FlatFileStore recover file, topic={}, total={}, 
cost={}ms",
                 topicMetadata.getTopic(), queueCount.get(), 
stopwatch.elapsed(TimeUnit.MILLISECONDS));
-        }, executor.bufferCommitExecutor);
+        }, executor.getBufferCommitExecutor());
     }
 
     public void scheduleDeleteExpireFile() {
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatMessageFile.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatMessageFile.java
index 1427c6cef9..413c6af65e 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatMessageFile.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/FlatMessageFile.java
@@ -22,8 +22,6 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -59,8 +57,6 @@ public class FlatMessageFile implements FlatFileInterface {
     protected final FlatCommitLogFile commitLog;
     protected final FlatConsumeQueueFile consumeQueue;
 
-    protected final ConcurrentMap<String, CompletableFuture<?>> 
inFlightRequestMap;
-
     public FlatMessageFile(FlatFileFactory fileFactory, String topic, int 
queueId) {
         this(fileFactory, MessageStoreUtil.toFilePath(
             new MessageQueue(topic, 
fileFactory.getStoreConfig().getBrokerName(), queueId)));
@@ -75,7 +71,6 @@ public class FlatMessageFile implements FlatFileInterface {
         this.metadataStore = fileFactory.getMetadataStore();
         this.commitLog = fileFactory.createFlatFileForCommitLog(filePath);
         this.consumeQueue = 
fileFactory.createFlatFileForConsumeQueue(filePath);
-        this.inFlightRequestMap = new ConcurrentHashMap<>();
     }
 
     @Override
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/FileSegment.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/FileSegment.java
index 0bedf64452..7235bc1083 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/FileSegment.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/FileSegment.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantLock;
 import org.apache.rocketmq.tieredstore.MessageStoreConfig;
 import org.apache.rocketmq.tieredstore.MessageStoreExecutor;
@@ -153,6 +154,14 @@ public abstract class FileSegment implements 
Comparable<FileSegment>, FileSegmen
         } finally {
             fileLock.unlock();
         }
+        CompletableFuture<Boolean> inflight = this.flightCommitRequest;
+        if (inflight != null && !inflight.isDone()) {
+            try {
+                inflight.get(30, TimeUnit.SECONDS);
+            } catch (Exception e) {
+                log.warn("FileSegment close: await in-flight commit timeout, 
filePath={}", filePath, e);
+            }
+        }
     }
 
     protected List<ByteBuffer> borrowBuffer() {
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/PosixFileSegment.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/PosixFileSegment.java
index 719cd90f8d..70dd4f081b 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/PosixFileSegment.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/PosixFileSegment.java
@@ -58,6 +58,8 @@ public class PosixFileSegment extends FileSegment {
     private volatile File file;
     private volatile FileChannel readFileChannel;
     private volatile FileChannel writeFileChannel;
+    private volatile RandomAccessFile readRandomAccessFile;
+    private volatile RandomAccessFile writeRandomAccessFile;
 
     public PosixFileSegment(MessageStoreConfig storeConfig,
         FileSegmentType fileType, String filePath, long baseOffset, 
MessageStoreExecutor executor) {
@@ -113,7 +115,7 @@ public class PosixFileSegment extends FileSegment {
         }
     }
 
-    @SuppressWarnings({"resource", "ResultOfMethodCallIgnored"})
+    @SuppressWarnings("ResultOfMethodCallIgnored")
     private void createFile0() {
         try {
             File file = new File(fullPath);
@@ -126,8 +128,10 @@ public class PosixFileSegment extends FileSegment {
                     log.debug("PosixFileSegment#createFile0, create file, 
filePath={}", fullPath);
                 }
             }
-            this.readFileChannel = new RandomAccessFile(file, 
"r").getChannel();
-            this.writeFileChannel = new RandomAccessFile(file, 
"rwd").getChannel();
+            this.readRandomAccessFile = new RandomAccessFile(file, "r");
+            this.readFileChannel = this.readRandomAccessFile.getChannel();
+            this.writeRandomAccessFile = new RandomAccessFile(file, "rwd");
+            this.writeFileChannel = this.writeRandomAccessFile.getChannel();
             this.file = file;
         } catch (Exception e) {
             log.error("PosixFileSegment#createFile0, create file failed, 
filePath={}", filePath, e);
@@ -155,10 +159,18 @@ public class PosixFileSegment extends FileSegment {
                 readFileChannel.close();
                 readFileChannel = null;
             }
+            if (readRandomAccessFile != null) {
+                readRandomAccessFile.close();
+                readRandomAccessFile = null;
+            }
             if (writeFileChannel != null && writeFileChannel.isOpen()) {
                 writeFileChannel.close();
                 writeFileChannel = null;
             }
+            if (writeRandomAccessFile != null) {
+                writeRandomAccessFile.close();
+                writeRandomAccessFile = null;
+            }
         } catch (IOException e) {
             log.error("PosixFileSegment#close, close failed, filePath={}", 
fullPath, e);
         }
@@ -192,7 +204,7 @@ public class PosixFileSegment extends FileSegment {
                 TieredStoreMetricsManager.providerRpcLatency.record(costTime, 
attributesBuilder.build());
             }
             return byteBuffer;
-        }, executor.bufferFetchExecutor);
+        }, executor.getBufferFetchExecutor());
     }
 
     @Override
@@ -228,6 +240,6 @@ public class PosixFileSegment extends FileSegment {
                 return false;
             }
             return true;
-        }, executor.bufferCommitExecutor);
+        }, executor.getBufferCommitExecutor());
     }
 }
diff --git 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java
 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java
index f88779f09b..6a3a6a6d5d 100644
--- 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java
+++ 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java
@@ -84,6 +84,7 @@ public class TieredMessageStoreTest {
         Properties properties = new Properties();
         properties.setProperty("recordGetMessageResult", 
Boolean.TRUE.toString().toLowerCase(Locale.ROOT));
         properties.setProperty("tieredBackendServiceProvider", 
PosixFileSegment.class.getName());
+        properties.setProperty("tieredStoreFilePath", storePath);
 
         configuration = new Configuration(LoggerFactory.getLogger(
             MessageStoreUtil.TIERED_STORE_LOGGER_NAME), storePath + 
File.separator + "conf",
diff --git 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/core/MessageStoreDispatcherImplTest.java
 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/core/MessageStoreDispatcherImplTest.java
index 2a0b0ad418..15f06d0548 100644
--- 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/core/MessageStoreDispatcherImplTest.java
+++ 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/core/MessageStoreDispatcherImplTest.java
@@ -89,6 +89,7 @@ public class MessageStoreDispatcherImplTest {
         if (messageStore != null) {
             messageStore.destroy();
         }
+        executor.shutdown();
         MessageStoreUtilTest.deleteStoreDirectory(storePath);
     }
 
@@ -100,7 +101,7 @@ public class MessageStoreDispatcherImplTest {
 
         messageStore = Mockito.mock(TieredMessageStore.class);
         IndexService indexService =
-            new IndexStoreService(new FlatFileFactory(metadataStore, 
storeConfig), storePath);
+            new IndexStoreService(new FlatFileFactory(metadataStore, 
storeConfig, executor), storePath);
         indexService.start();
         Mockito.when(messageStore.getDefaultStore()).thenReturn(defaultStore);
         Mockito.when(messageStore.getStoreConfig()).thenReturn(storeConfig);
@@ -168,7 +169,7 @@ public class MessageStoreDispatcherImplTest {
 
         messageStore = Mockito.mock(TieredMessageStore.class);
         IndexService indexService =
-            new IndexStoreService(new FlatFileFactory(metadataStore, 
storeConfig), storePath);
+            new IndexStoreService(new FlatFileFactory(metadataStore, 
storeConfig, executor), storePath);
         indexService.start();
         Mockito.when(messageStore.getDefaultStore()).thenReturn(defaultStore);
         Mockito.when(messageStore.getStoreConfig()).thenReturn(storeConfig);
@@ -232,7 +233,7 @@ public class MessageStoreDispatcherImplTest {
 
         messageStore = Mockito.mock(TieredMessageStore.class);
         IndexService indexService =
-            new IndexStoreService(new FlatFileFactory(metadataStore, 
storeConfig), storePath);
+            new IndexStoreService(new FlatFileFactory(metadataStore, 
storeConfig, executor), storePath);
         indexService.start();
         Mockito.when(messageStore.getDefaultStore()).thenReturn(defaultStore);
         Mockito.when(messageStore.getStoreConfig()).thenReturn(storeConfig);
@@ -289,7 +290,7 @@ public class MessageStoreDispatcherImplTest {
         MessageStore defaultStore = Mockito.mock(MessageStore.class);
         messageStore = Mockito.mock(TieredMessageStore.class);
         IndexService indexService =
-            new IndexStoreService(new FlatFileFactory(metadataStore, 
storeConfig), storePath);
+            new IndexStoreService(new FlatFileFactory(metadataStore, 
storeConfig, executor), storePath);
         Mockito.when(messageStore.getDefaultStore()).thenReturn(defaultStore);
         Mockito.when(messageStore.getStoreConfig()).thenReturn(storeConfig);
         Mockito.when(messageStore.getStoreExecutor()).thenReturn(executor);
@@ -323,7 +324,7 @@ public class MessageStoreDispatcherImplTest {
         MessageStore defaultStore = Mockito.mock(MessageStore.class);
         messageStore = Mockito.mock(TieredMessageStore.class);
         IndexService indexService =
-            new IndexStoreService(new FlatFileFactory(metadataStore, 
storeConfig), storePath);
+            new IndexStoreService(new FlatFileFactory(metadataStore, 
storeConfig, executor), storePath);
         Mockito.when(messageStore.getDefaultStore()).thenReturn(defaultStore);
         Mockito.when(messageStore.getStoreConfig()).thenReturn(storeConfig);
         Mockito.when(messageStore.getStoreExecutor()).thenReturn(executor);
diff --git 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatAppendFileTest.java
 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatAppendFileTest.java
index 2e6943728e..ed223fcf9a 100644
--- 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatAppendFileTest.java
+++ 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatAppendFileTest.java
@@ -22,6 +22,7 @@ import java.util.Arrays;
 import java.util.concurrent.CompletionException;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.tieredstore.MessageStoreConfig;
+import org.apache.rocketmq.tieredstore.MessageStoreExecutor;
 import org.apache.rocketmq.tieredstore.common.FileSegmentType;
 import org.apache.rocketmq.tieredstore.exception.TieredStoreErrorCode;
 import org.apache.rocketmq.tieredstore.exception.TieredStoreException;
@@ -43,6 +44,7 @@ public class FlatAppendFileTest {
     private MessageQueue queue;
     private MetadataStore metadataStore;
     private MessageStoreConfig storeConfig;
+    private MessageStoreExecutor executor;
     private FlatFileFactory flatFileFactory;
 
     @Before
@@ -56,11 +58,13 @@ public class FlatAppendFileTest {
         storeConfig.setTieredStoreConsumeQueueMaxSize(2000L);
         queue = new MessageQueue("TieredFlatFileTest", 
storeConfig.getBrokerName(), 0);
         metadataStore = new DefaultMetadataStore(storeConfig);
-        flatFileFactory = new FlatFileFactory(metadataStore, storeConfig);
+        executor = new MessageStoreExecutor();
+        flatFileFactory = new FlatFileFactory(metadataStore, storeConfig, 
executor);
     }
 
     @After
     public void shutdown() throws IOException {
+        executor.shutdown();
         MessageStoreUtilTest.deleteStoreDirectory(storePath);
     }
 
diff --git 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatCommitLogFileTest.java
 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatCommitLogFileTest.java
index 0fbf5a6a84..d0b82d765e 100644
--- 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatCommitLogFileTest.java
+++ 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatCommitLogFileTest.java
@@ -21,6 +21,7 @@ import java.nio.ByteBuffer;
 import java.util.concurrent.TimeUnit;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.tieredstore.MessageStoreConfig;
+import org.apache.rocketmq.tieredstore.MessageStoreExecutor;
 import org.apache.rocketmq.tieredstore.common.AppendResult;
 import org.apache.rocketmq.tieredstore.metadata.DefaultMetadataStore;
 import org.apache.rocketmq.tieredstore.metadata.MetadataStore;
@@ -40,6 +41,7 @@ public class FlatCommitLogFileTest {
     private MessageQueue queue;
     private MetadataStore metadataStore;
     private MessageStoreConfig storeConfig;
+    private MessageStoreExecutor executor;
     private FlatFileFactory flatFileFactory;
 
     @Before
@@ -53,11 +55,13 @@ public class FlatCommitLogFileTest {
         storeConfig.setTieredStoreConsumeQueueMaxSize(2000L);
         queue = new MessageQueue("TieredFlatFileTest", 
storeConfig.getBrokerName(), 0);
         metadataStore = new DefaultMetadataStore(storeConfig);
-        flatFileFactory = new FlatFileFactory(metadataStore, storeConfig);
+        executor = new MessageStoreExecutor();
+        flatFileFactory = new FlatFileFactory(metadataStore, storeConfig, 
executor);
     }
 
     @After
     public void shutdown() throws IOException {
+        executor.shutdown();
         MessageStoreUtilTest.deleteStoreDirectory(storePath);
     }
 
diff --git 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatFileFactoryTest.java
 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatFileFactoryTest.java
index bc8ebaf1cb..1c322d24f3 100644
--- 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatFileFactoryTest.java
+++ 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatFileFactoryTest.java
@@ -17,6 +17,7 @@
 package org.apache.rocketmq.tieredstore.file;
 
 import org.apache.rocketmq.tieredstore.MessageStoreConfig;
+import org.apache.rocketmq.tieredstore.MessageStoreExecutor;
 import org.apache.rocketmq.tieredstore.metadata.DefaultMetadataStore;
 import org.apache.rocketmq.tieredstore.metadata.MetadataStore;
 import org.apache.rocketmq.tieredstore.util.MessageStoreUtilTest;
@@ -30,7 +31,8 @@ public class FlatFileFactoryTest {
         MessageStoreConfig storeConfig = new MessageStoreConfig();
         
storeConfig.setTieredStoreFilePath(MessageStoreUtilTest.getRandomStorePath());
         MetadataStore metadataStore = new DefaultMetadataStore(storeConfig);
-        FlatFileFactory factory = new FlatFileFactory(metadataStore, 
storeConfig);
+        MessageStoreExecutor executor = new MessageStoreExecutor();
+        FlatFileFactory factory = new FlatFileFactory(metadataStore, 
storeConfig, executor);
         Assert.assertEquals(storeConfig, factory.getStoreConfig());
         Assert.assertEquals(metadataStore, factory.getMetadataStore());
 
@@ -45,5 +47,6 @@ public class FlatFileFactoryTest {
         flatFile1.destroy();
         flatFile2.destroy();
         flatFile3.destroy();
+        executor.shutdown();
     }
 }
\ No newline at end of file
diff --git 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatFileStoreTest.java
 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatFileStoreTest.java
index 2a007af4e9..0005d825d5 100644
--- 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatFileStoreTest.java
+++ 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatFileStoreTest.java
@@ -45,6 +45,7 @@ public class FlatFileStoreTest {
     public void init() {
         storeConfig = new MessageStoreConfig();
         storeConfig.setStorePathRootDir(storePath);
+        storeConfig.setTieredStoreFilePath(storePath);
         
storeConfig.setTieredBackendServiceProvider(PosixFileSegment.class.getName());
         storeConfig.setBrokerName("brokerName");
         metadataStore = new DefaultMetadataStore(storeConfig);
diff --git 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatMessageFileTest.java
 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatMessageFileTest.java
index 97768d0658..0ddaa7ce4f 100644
--- 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatMessageFileTest.java
+++ 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/FlatMessageFileTest.java
@@ -23,6 +23,7 @@ import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.store.ConsumeQueue;
 import org.apache.rocketmq.store.DispatchRequest;
 import org.apache.rocketmq.tieredstore.MessageStoreConfig;
+import org.apache.rocketmq.tieredstore.MessageStoreExecutor;
 import org.apache.rocketmq.tieredstore.common.AppendResult;
 import org.apache.rocketmq.tieredstore.metadata.DefaultMetadataStore;
 import org.apache.rocketmq.tieredstore.metadata.MetadataStore;
@@ -41,6 +42,7 @@ public class FlatMessageFileTest {
     private final String storePath = MessageStoreUtilTest.getRandomStorePath();
     private MessageStoreConfig storeConfig;
     private MetadataStore metadataStore;
+    private MessageStoreExecutor executor;
     private FlatFileFactory flatFileFactory;
 
     @Before
@@ -48,15 +50,18 @@ public class FlatMessageFileTest {
         storeConfig = new MessageStoreConfig();
         storeConfig.setBrokerName("brokerName");
         storeConfig.setStorePathRootDir(storePath);
+        storeConfig.setTieredStoreFilePath(storePath);
         
storeConfig.setTieredBackendServiceProvider(PosixFileSegment.class.getName());
         storeConfig.setCommitLogRollingInterval(0);
         storeConfig.setCommitLogRollingMinimumSize(999);
         metadataStore = new DefaultMetadataStore(storeConfig);
-        flatFileFactory = new FlatFileFactory(metadataStore, storeConfig);
+        executor = new MessageStoreExecutor();
+        flatFileFactory = new FlatFileFactory(metadataStore, storeConfig, 
executor);
     }
 
     @After
     public void shutdown() throws IOException {
+        executor.shutdown();
         MessageStoreUtilTest.deleteStoreDirectory(storePath);
     }
 
@@ -142,7 +147,7 @@ public class FlatMessageFileTest {
 
         // replace provider, need new factory again
         
storeConfig.setTieredBackendServiceProvider(PosixFileSegment.class.getName());
-        flatFileFactory = new FlatFileFactory(metadataStore, storeConfig);
+        flatFileFactory = new FlatFileFactory(metadataStore, storeConfig, 
executor);
 
         // inject store time: 0, +100, +100, +100, +200
         MessageQueue mq = new MessageQueue("TopicTest", "BrokerName", 1);
diff --git 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreServiceBenchTest.java
 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreServiceBenchTest.java
index fcb28402ea..5a579bef9e 100644
--- 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreServiceBenchTest.java
+++ 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreServiceBenchTest.java
@@ -28,6 +28,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.LongAdder;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.tieredstore.MessageStoreConfig;
+import org.apache.rocketmq.tieredstore.MessageStoreExecutor;
 import org.apache.rocketmq.tieredstore.common.AppendResult;
 import org.apache.rocketmq.tieredstore.file.FlatFileFactory;
 import org.apache.rocketmq.tieredstore.metadata.DefaultMetadataStore;
@@ -61,6 +62,7 @@ public class IndexStoreServiceBenchTest {
     private static final Logger log = 
LoggerFactory.getLogger(MessageStoreUtil.TIERED_STORE_LOGGER_NAME);
     private static final String TOPIC_NAME = "TopicTest";
     private MessageStoreConfig storeConfig;
+    private MessageStoreExecutor executor;
     private IndexStoreService indexStoreService;
     private final LongAdder failureCount = new LongAdder();
 
@@ -77,7 +79,8 @@ public class IndexStoreServiceBenchTest {
         storeConfig.setTieredStoreIndexFileMaxHashSlotNum(500 * 1000);
         storeConfig.setTieredStoreIndexFileMaxIndexNum(2000 * 1000);
         MetadataStore metadataStore = new DefaultMetadataStore(storeConfig);
-        FlatFileFactory flatFileFactory = new FlatFileFactory(metadataStore, 
storeConfig);
+        executor = new MessageStoreExecutor();
+        FlatFileFactory flatFileFactory = new FlatFileFactory(metadataStore, 
storeConfig, executor);
         indexStoreService = new IndexStoreService(flatFileFactory, storePath);
         indexStoreService.start();
     }
@@ -86,6 +89,7 @@ public class IndexStoreServiceBenchTest {
     public void shutdown() throws IOException {
         indexStoreService.shutdown();
         indexStoreService.destroy();
+        executor.shutdown();
     }
 
     //@Benchmark
diff --git 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreServiceTest.java
 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreServiceTest.java
index 4736d5f585..0d849d5927 100644
--- 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreServiceTest.java
+++ 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/index/IndexStoreServiceTest.java
@@ -37,6 +37,7 @@ import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.rocketmq.common.ThreadFactoryImpl;
 import org.apache.rocketmq.store.logfile.DefaultMappedFile;
 import org.apache.rocketmq.tieredstore.MessageStoreConfig;
+import org.apache.rocketmq.tieredstore.MessageStoreExecutor;
 import org.apache.rocketmq.tieredstore.common.AppendResult;
 import org.apache.rocketmq.tieredstore.file.FlatAppendFile;
 import org.apache.rocketmq.tieredstore.file.FlatFileFactory;
@@ -67,6 +68,7 @@ public class IndexStoreServiceTest {
 
     private String filePath;
     private MessageStoreConfig storeConfig;
+    private MessageStoreExecutor executor;
     private FlatFileFactory fileAllocator;
     private IndexStoreService indexService;
 
@@ -81,7 +83,8 @@ public class IndexStoreServiceTest {
         storeConfig.setTieredStoreIndexFileMaxIndexNum(20);
         
storeConfig.setTieredBackendServiceProvider("org.apache.rocketmq.tieredstore.provider.PosixFileSegment");
         MetadataStore metadataStore = new DefaultMetadataStore(storeConfig);
-        fileAllocator = new FlatFileFactory(metadataStore, storeConfig);
+        executor = new MessageStoreExecutor();
+        fileAllocator = new FlatFileFactory(metadataStore, storeConfig, 
executor);
     }
 
     @After
@@ -90,6 +93,7 @@ public class IndexStoreServiceTest {
             indexService.shutdown();
             indexService.destroy();
         }
+        executor.shutdown();
         
MessageStoreUtilTest.deleteStoreDirectory(storeConfig.getTieredStoreFilePath());
     }
 


Reply via email to