This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new c78061bf6c [ISSUE#7280] Fix and refactor handle commit exception in 
tiered storage (#7281)
c78061bf6c is described below

commit c78061bf6ca5f35452510ec4107c46735c51c316
Author: lizhimins <707364...@qq.com>
AuthorDate: Wed Aug 30 22:29:51 2023 +0800

    [ISSUE#7280] Fix and refactor handle commit exception in tiered storage 
(#7281)
    
    * refactor handle commit exception
    
    * refactor handle commit exception
    
    * fix handle commit exception
---
 .../rocketmq/tieredstore/TieredDispatcher.java     |   3 +-
 .../rocketmq/tieredstore/TieredMessageFetcher.java |  57 ++--
 .../rocketmq/tieredstore/TieredMessageStore.java   |  26 +-
 .../tieredstore/provider/TieredFileSegment.java    | 291 ++++++++++++---------
 .../tieredstore/provider/TieredStoreProvider.java  |   8 +-
 .../provider/posix/PosixFileSegment.java           |   4 +-
 .../CommitLogInputStream.java}                     |  30 ++-
 .../FileSegmentInputStream.java}                   |  49 ++--
 .../FileSegmentInputStreamFactory.java}            |  26 +-
 .../tieredstore/TieredMessageStoreTest.java        |  14 +-
 .../tieredstore/file/TieredFlatFileTest.java       |   2 +
 .../tieredstore/file/TieredIndexFileTest.java      |   2 +
 ...Stream.java => MockFileSegmentInputStream.java} |   8 +-
 .../provider/TieredFileSegmentInputStreamTest.java |  24 +-
 .../provider/TieredFileSegmentTest.java            |  89 ++++++-
 .../provider/memory/MemoryFileSegment.java         |  27 +-
 .../memory/MemoryFileSegmentWithoutCheck.java      |   4 +-
 17 files changed, 427 insertions(+), 237 deletions(-)

diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
index 1746190cdb..430c2b62eb 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
@@ -318,8 +318,7 @@ public class TieredDispatcher extends ServiceThread 
implements CommitLogDispatch
                         continue;
                     case FILE_CLOSED:
                         
tieredFlatFileManager.destroyCompositeFile(flatFile.getMessageQueue());
-                        logger.info("TieredDispatcher#dispatchFlatFile: file 
has been close and destroy, " +
-                            "topic: {}, queueId: {}", topic, queueId);
+                        logger.info("File has been closed and destroy, topic: 
{}, queueId: {}", topic, queueId);
                         return;
                     default:
                         dispatchOffset--;
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java
index 9a9a3e5a5c..766ff64f6c 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java
@@ -273,15 +273,17 @@ public class TieredMessageFetcher implements 
MessageStoreFetcher {
             TieredStoreMetricsManager.cacheHit.add(resultWrapperList.size(), 
attributes);
         }
 
-        // if no cached message found and there is currently an inflight 
request, wait for the request to end before continuing
+        // If there are no messages in the cache and there are currently 
requests being pulled.
+        // We need to wait for the request to return before continuing.
         if (resultWrapperList.isEmpty() && waitInflightRequest) {
-            CompletableFuture<Long> future = 
flatFile.getInflightRequest(group, queueOffset, maxCount)
-                .getFuture(queueOffset);
+            CompletableFuture<Long> future =
+                flatFile.getInflightRequest(group, queueOffset, 
maxCount).getFuture(queueOffset);
             if (!future.isDone()) {
                 Stopwatch stopwatch = Stopwatch.createStarted();
                 // to prevent starvation issues, only allow waiting for 
inflight request once
                 return future.thenCompose(v -> {
-                    
LOGGER.debug("TieredMessageFetcher#getMessageFromCacheAsync: wait for inflight 
request cost: {}ms", stopwatch.elapsed(TimeUnit.MILLISECONDS));
+                    LOGGER.debug("MessageFetcher#getMessageFromCacheAsync: 
wait for response cost: {}ms",
+                        stopwatch.elapsed(TimeUnit.MILLISECONDS));
                     return getMessageFromCacheAsync(flatFile, group, 
queueOffset, maxCount, false);
                 });
             }
@@ -302,7 +304,8 @@ public class TieredMessageFetcher implements 
MessageStoreFetcher {
 
         // if cache hit, result will be returned immediately and 
asynchronously prefetch messages for later requests
         if (!resultWrapperList.isEmpty()) {
-            LOGGER.debug("TieredMessageFetcher#getMessageFromCacheAsync: cache 
hit: topic: {}, queue: {}, queue offset: {}, max message num: {}, cache hit 
num: {}",
+            LOGGER.debug("MessageFetcher#getMessageFromCacheAsync: cache hit: 
" +
+                    "topic: {}, queue: {}, queue offset: {}, max message num: 
{}, cache hit num: {}",
                 mq.getTopic(), mq.getQueueId(), queueOffset, maxCount, 
resultWrapperList.size());
             prefetchMessage(flatFile, group, maxCount, lastGetOffset + 1);
 
@@ -316,8 +319,10 @@ public class TieredMessageFetcher implements 
MessageStoreFetcher {
         }
 
         // if cache is miss, immediately pull messages
-        LOGGER.warn("TieredMessageFetcher#getMessageFromCacheAsync: cache 
miss: topic: {}, queue: {}, queue offset: {}, max message num: {}",
+        LOGGER.warn("TieredMessageFetcher#getMessageFromCacheAsync: cache 
miss: " +
+                "topic: {}, queue: {}, queue offset: {}, max message num: {}",
             mq.getTopic(), mq.getQueueId(), queueOffset, maxCount);
+
         CompletableFuture<GetMessageResult> resultFuture;
         synchronized (flatFile) {
             int batchSize = maxCount * storeConfig.getReadAheadMinFactor();
@@ -453,42 +458,42 @@ public class TieredMessageFetcher implements 
MessageStoreFetcher {
     public CompletableFuture<GetMessageResult> getMessageAsync(
         String group, String topic, int queueId, long queueOffset, int 
maxCount, final MessageFilter messageFilter) {
 
+        GetMessageResult result = new GetMessageResult();
         CompositeQueueFlatFile flatFile = flatFileManager.getFlatFile(new 
MessageQueue(topic, brokerName, queueId));
+
         if (flatFile == null) {
-            GetMessageResult result = new GetMessageResult();
             result.setNextBeginOffset(queueOffset);
             result.setStatus(GetMessageStatus.NO_MATCHED_LOGIC_QUEUE);
             return CompletableFuture.completedFuture(result);
         }
 
-        GetMessageResult result = new GetMessageResult();
-        long minQueueOffset = flatFile.getConsumeQueueMinOffset();
-        long maxQueueOffset = flatFile.getConsumeQueueCommitOffset();
-        result.setMinOffset(minQueueOffset);
-        result.setMaxOffset(maxQueueOffset);
+        // Max queue offset means next message put position
+        result.setMinOffset(flatFile.getConsumeQueueMinOffset());
+        result.setMaxOffset(flatFile.getConsumeQueueCommitOffset());
+
+        // Fill result according file offset.
+        // Offset range  | Result           | Fix to
+        // (-oo, 0]      | no message       | current offset
+        // (0, min)      | too small        | min offset
+        // [min, max)    | correct          |
+        // [max, max]    | overflow one     | max offset
+        // (max, +oo)    | overflow badly   | max offset
 
-        if (flatFile.getConsumeQueueCommitOffset() <= 0) {
+        if (result.getMaxOffset() <= 0) {
             result.setStatus(GetMessageStatus.NO_MESSAGE_IN_QUEUE);
             result.setNextBeginOffset(queueOffset);
             return CompletableFuture.completedFuture(result);
-        }
-
-        // request range | result
-        // (0, min)      | too small
-        // [min, max)    | correct
-        // [max, max]    | overflow one
-        // (max, +oo)    | overflow badly
-        if (queueOffset < minQueueOffset) {
+        } else if (queueOffset < result.getMinOffset()) {
             result.setStatus(GetMessageStatus.OFFSET_TOO_SMALL);
-            result.setNextBeginOffset(flatFile.getConsumeQueueMinOffset());
+            result.setNextBeginOffset(result.getMinOffset());
             return CompletableFuture.completedFuture(result);
-        } else if (queueOffset == maxQueueOffset) {
+        } else if (queueOffset == result.getMaxOffset()) {
             result.setStatus(GetMessageStatus.OFFSET_OVERFLOW_ONE);
-            result.setNextBeginOffset(flatFile.getConsumeQueueCommitOffset());
+            result.setNextBeginOffset(result.getMaxOffset());
             return CompletableFuture.completedFuture(result);
-        } else if (queueOffset > maxQueueOffset) {
+        } else if (queueOffset > result.getMaxOffset()) {
             result.setStatus(GetMessageStatus.OFFSET_OVERFLOW_BADLY);
-            result.setNextBeginOffset(flatFile.getConsumeQueueCommitOffset());
+            result.setNextBeginOffset(result.getMaxOffset());
             return CompletableFuture.completedFuture(result);
         }
 
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
index 5240ac8e9e..78e855f365 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java
@@ -99,11 +99,11 @@ public class TieredMessageStore extends 
AbstractPluginMessageStore {
         return storeConfig;
     }
 
-    public boolean viaTieredStorage(String topic, int queueId, long offset) {
-        return viaTieredStorage(topic, queueId, offset, 1);
+    public boolean fetchFromCurrentStore(String topic, int queueId, long 
offset) {
+        return fetchFromCurrentStore(topic, queueId, offset, 1);
     }
 
-    public boolean viaTieredStorage(String topic, int queueId, long offset, 
int batchSize) {
+    public boolean fetchFromCurrentStore(String topic, int queueId, long 
offset, int batchSize) {
         TieredMessageStoreConfig.TieredStorageLevel deepStorageLevel = 
storeConfig.getTieredStorageLevel();
 
         if 
(deepStorageLevel.check(TieredMessageStoreConfig.TieredStorageLevel.FORCE)) {
@@ -146,8 +146,10 @@ public class TieredMessageStore extends 
AbstractPluginMessageStore {
     public CompletableFuture<GetMessageResult> getMessageAsync(String group, 
String topic,
         int queueId, long offset, int maxMsgNums, MessageFilter messageFilter) 
{
 
-        if (!viaTieredStorage(topic, queueId, offset, maxMsgNums)) {
-            logger.trace("GetMessageAsync from next store topic: {}, queue: 
{}, offset: {}", topic, queueId, offset);
+        if (fetchFromCurrentStore(topic, queueId, offset, maxMsgNums)) {
+            logger.trace("GetMessageAsync from current store, topic: {}, 
queue: {}, offset: {}", topic, queueId, offset);
+        } else {
+            logger.trace("GetMessageAsync from next store, topic: {}, queue: 
{}, offset: {}", topic, queueId, offset);
             return next.getMessageAsync(group, topic, queueId, offset, 
maxMsgNums, messageFilter);
         }
 
@@ -168,14 +170,14 @@ public class TieredMessageStore extends 
AbstractPluginMessageStore {
 
                     if (next.checkInStoreByConsumeOffset(topic, queueId, 
offset)) {
                         TieredStoreMetricsManager.fallbackTotal.add(1, 
latencyAttributes);
-                        logger.debug("GetMessageAsync not found then try back 
to next store, result: {}, " +
+                        logger.debug("GetMessageAsync not found, then back to 
next store, result: {}, " +
                                 "topic: {}, queue: {}, queue offset: {}, 
offset range: {}-{}",
                             result.getStatus(), topic, queueId, offset, 
result.getMinOffset(), result.getMaxOffset());
                         return next.getMessage(group, topic, queueId, offset, 
maxMsgNums, messageFilter);
                     }
                 }
 
-                // system topic
+                // Fetch system topic data from the broker when using the 
force level.
                 if (result.getStatus() == 
GetMessageStatus.NO_MATCHED_LOGIC_QUEUE) {
                     if (TieredStoreUtil.isSystemTopic(topic) || 
PopAckConstants.isStartWithRevivePrefix(topic)) {
                         return next.getMessage(group, topic, queueId, offset, 
maxMsgNums, messageFilter);
@@ -198,7 +200,7 @@ public class TieredMessageStore extends 
AbstractPluginMessageStore {
                     
TieredStoreMetricsManager.messagesOutTotal.add(result.getMessageCount(), 
messagesOutAttributes);
                 }
 
-                // fix min or max offset according next store
+                // Fix min or max offset according next store at last
                 long minOffsetInQueue = next.getMinOffsetInQueue(topic, 
queueId);
                 if (minOffsetInQueue >= 0 && minOffsetInQueue < 
result.getMinOffset()) {
                     result.setMinOffset(minOffsetInQueue);
@@ -209,7 +211,7 @@ public class TieredMessageStore extends 
AbstractPluginMessageStore {
                 }
                 return result;
             }).exceptionally(e -> {
-                logger.error("GetMessageAsync from tiered store failed: ", e);
+                logger.error("GetMessageAsync from tiered store failed", e);
                 return next.getMessage(group, topic, queueId, offset, 
maxMsgNums, messageFilter);
             });
     }
@@ -251,7 +253,7 @@ public class TieredMessageStore extends 
AbstractPluginMessageStore {
                     .build();
                 
TieredStoreMetricsManager.apiLatency.record(stopwatch.elapsed(TimeUnit.MILLISECONDS),
 latencyAttributes);
                 if (time < 0) {
-                    
logger.debug("TieredMessageStore#getEarliestMessageTimeAsync: get earliest 
message time failed, try to get earliest message time from next store: topic: 
{}, queue: {}",
+                    logger.debug("GetEarliestMessageTimeAsync failed, try to 
get earliest message time from next store: topic: {}, queue: {}",
                         topic, queueId);
                     return finalNextEarliestMessageTime != Long.MAX_VALUE ? 
finalNextEarliestMessageTime : -1;
                 }
@@ -262,7 +264,7 @@ public class TieredMessageStore extends 
AbstractPluginMessageStore {
     @Override
     public CompletableFuture<Long> getMessageStoreTimeStampAsync(String topic, 
int queueId,
         long consumeQueueOffset) {
-        if (viaTieredStorage(topic, queueId, consumeQueueOffset)) {
+        if (fetchFromCurrentStore(topic, queueId, consumeQueueOffset)) {
             Stopwatch stopwatch = Stopwatch.createStarted();
             return fetcher.getMessageStoreTimeStampAsync(topic, queueId, 
consumeQueueOffset)
                 .thenApply(time -> {
@@ -272,7 +274,7 @@ public class TieredMessageStore extends 
AbstractPluginMessageStore {
                         .build();
                     
TieredStoreMetricsManager.apiLatency.record(stopwatch.elapsed(TimeUnit.MILLISECONDS),
 latencyAttributes);
                     if (time == -1) {
-                        
logger.debug("TieredMessageStore#getMessageStoreTimeStampAsync: get message 
time failed, try to get message time from next store: topic: {}, queue: {}, 
queue offset: {}",
+                        logger.debug("GetEarliestMessageTimeAsync failed, try 
to get message time from next store, topic: {}, queue: {}, queue offset: {}",
                             topic, queueId, consumeQueueOffset);
                         return next.getMessageStoreTimeStamp(topic, queueId, 
consumeQueueOffset);
                     }
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegment.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegment.java
index 5062c7d9e7..32911a6e89 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegment.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegment.java
@@ -16,14 +16,11 @@
  */
 package org.apache.rocketmq.tieredstore.provider;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Stopwatch;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantLock;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
@@ -35,8 +32,8 @@ import 
org.apache.rocketmq.tieredstore.exception.TieredStoreException;
 import org.apache.rocketmq.tieredstore.file.TieredCommitLog;
 import org.apache.rocketmq.tieredstore.file.TieredConsumeQueue;
 import org.apache.rocketmq.tieredstore.file.TieredIndexFile;
-import 
org.apache.rocketmq.tieredstore.provider.inputstream.TieredFileSegmentInputStream;
-import 
org.apache.rocketmq.tieredstore.provider.inputstream.TieredFileSegmentInputStreamFactory;
+import org.apache.rocketmq.tieredstore.provider.stream.FileSegmentInputStream;
+import 
org.apache.rocketmq.tieredstore.provider.stream.FileSegmentInputStreamFactory;
 import org.apache.rocketmq.tieredstore.util.MessageBufferUtil;
 import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
 
@@ -50,22 +47,23 @@ public abstract class TieredFileSegment implements 
Comparable<TieredFileSegment>
     protected final TieredMessageStoreConfig storeConfig;
 
     private final long maxSize;
-    private final ReentrantLock bufferLock;
-    private final Semaphore commitLock;
+    private final ReentrantLock bufferLock = new ReentrantLock();
+    private final Semaphore commitLock = new Semaphore(1);
 
-    private volatile boolean full;
-    private volatile boolean closed;
+    private volatile boolean full = false;
+    private volatile boolean closed = false;
 
-    private volatile long minTimestamp;
-    private volatile long maxTimestamp;
-    private volatile long commitPosition;
-    private volatile long appendPosition;
+    private volatile long minTimestamp = Long.MAX_VALUE;
+    private volatile long maxTimestamp = Long.MAX_VALUE;
+    private volatile long commitPosition = 0L;
+    private volatile long appendPosition = 0L;
 
     // only used in commitLog
-    private volatile long dispatchCommitOffset = 0;
+    private volatile long dispatchCommitOffset = 0L;
 
     private ByteBuffer codaBuffer;
-    private List<ByteBuffer> uploadBufferList = new ArrayList<>();
+    private List<ByteBuffer> bufferList = new ArrayList<>();
+    private FileSegmentInputStream fileSegmentInputStream;
     private CompletableFuture<Boolean> flightCommitRequest = 
CompletableFuture.completedFuture(false);
 
     public TieredFileSegment(TieredMessageStoreConfig storeConfig,
@@ -75,21 +73,13 @@ public abstract class TieredFileSegment implements 
Comparable<TieredFileSegment>
         this.fileType = fileType;
         this.filePath = filePath;
         this.baseOffset = baseOffset;
-
-        this.closed = false;
-        this.bufferLock = new ReentrantLock();
-        this.commitLock = new Semaphore(1);
-
-        this.commitPosition = 0L;
-        this.appendPosition = 0L;
-        this.minTimestamp = Long.MAX_VALUE;
-        this.maxTimestamp = Long.MAX_VALUE;
-
-        // The max segment size of a file is determined by the file type
-        this.maxSize = getMaxSizeAccordingFileType(storeConfig);
+        this.maxSize = getMaxSizeByFileType();
     }
 
-    private long getMaxSizeAccordingFileType(TieredMessageStoreConfig 
storeConfig) {
+    /**
+     * The max segment size of a file is determined by the file type
+     */
+    protected long getMaxSizeByFileType() {
         switch (fileType) {
             case COMMIT_LOG:
                 return storeConfig.getTieredStoreCommitLogMaxSize();
@@ -184,39 +174,23 @@ public abstract class TieredFileSegment implements 
Comparable<TieredFileSegment>
         this.appendPosition = pos;
     }
 
-    private List<ByteBuffer> rollingUploadBuffer() {
+    private List<ByteBuffer> borrowBuffer() {
         bufferLock.lock();
         try {
-            List<ByteBuffer> tmp = uploadBufferList;
-            uploadBufferList = new ArrayList<>();
+            List<ByteBuffer> tmp = bufferList;
+            bufferList = new ArrayList<>();
             return tmp;
         } finally {
             bufferLock.unlock();
         }
     }
 
-    private void sendBackBuffer(TieredFileSegmentInputStream inputStream) {
-        bufferLock.lock();
-        try {
-            List<ByteBuffer> tmpBufferList = inputStream.getUploadBufferList();
-            for (ByteBuffer buffer : tmpBufferList) {
-                buffer.rewind();
-            }
-            tmpBufferList.addAll(uploadBufferList);
-            uploadBufferList = tmpBufferList;
-            if (inputStream.getCodaBuffer() != null) {
-                codaBuffer.rewind();
-            }
-        } finally {
-            bufferLock.unlock();
-        }
-    }
-
     @SuppressWarnings("NonAtomicOperationOnVolatileField")
-    public AppendResult append(ByteBuffer byteBuf, long timeStamp) {
+    public AppendResult append(ByteBuffer byteBuf, long timestamp) {
         if (closed) {
             return AppendResult.FILE_CLOSED;
         }
+
         bufferLock.lock();
         try {
             if (full || codaBuffer != null) {
@@ -227,7 +201,8 @@ public abstract class TieredFileSegment implements 
Comparable<TieredFileSegment>
                 minTimestamp = 
byteBuf.getLong(TieredIndexFile.INDEX_FILE_HEADER_BEGIN_TIME_STAMP_POSITION);
                 maxTimestamp = 
byteBuf.getLong(TieredIndexFile.INDEX_FILE_HEADER_END_TIME_STAMP_POSITION);
                 appendPosition += byteBuf.remaining();
-                uploadBufferList.add(byteBuf);
+                // IndexFile is large and not change after compaction, no need 
deep copy
+                bufferList.add(byteBuf);
                 setFull();
                 return AppendResult.SUCCESS;
             }
@@ -236,23 +211,34 @@ public abstract class TieredFileSegment implements 
Comparable<TieredFileSegment>
                 setFull();
                 return AppendResult.FILE_FULL;
             }
-            if (uploadBufferList.size() > 
storeConfig.getTieredStoreGroupCommitCount()
+
+            if (bufferList.size() > 
storeConfig.getTieredStoreGroupCommitCount()
                 || appendPosition - commitPosition > 
storeConfig.getTieredStoreGroupCommitSize()) {
                 commitAsync();
             }
-            if (uploadBufferList.size() > 
storeConfig.getTieredStoreMaxGroupCommitCount()) {
-                logger.debug("TieredFileSegment#append: buffer full: file: {}, 
upload buffer size: {}",
-                    getPath(), uploadBufferList.size());
+
+            if (bufferList.size() > 
storeConfig.getTieredStoreMaxGroupCommitCount()) {
+                logger.debug("File segment append buffer full, file: {}, 
buffer size: {}, pending bytes: {}",
+                    getPath(), bufferList.size(), appendPosition - 
commitPosition);
                 return AppendResult.BUFFER_FULL;
             }
-            if (timeStamp != Long.MAX_VALUE) {
-                maxTimestamp = timeStamp;
+
+            if (timestamp != Long.MAX_VALUE) {
+                maxTimestamp = timestamp;
                 if (minTimestamp == Long.MAX_VALUE) {
-                    minTimestamp = timeStamp;
+                    minTimestamp = timestamp;
                 }
             }
+
             appendPosition += byteBuf.remaining();
-            uploadBufferList.add(byteBuf);
+
+            // deep copy buffer
+            ByteBuffer byteBuffer = 
ByteBuffer.allocateDirect(byteBuf.remaining());
+            byteBuffer.put(byteBuf);
+            byteBuffer.flip();
+            byteBuf.rewind();
+
+            bufferList.add(byteBuffer);
             return AppendResult.SUCCESS;
         } finally {
             bufferLock.unlock();
@@ -267,7 +253,6 @@ public abstract class TieredFileSegment implements 
Comparable<TieredFileSegment>
         return appendPosition;
     }
 
-    @VisibleForTesting
     public void setAppendPosition(long appendPosition) {
         this.appendPosition = appendPosition;
     }
@@ -333,6 +318,8 @@ public abstract class TieredFileSegment implements 
Comparable<TieredFileSegment>
         if (closed) {
             return false;
         }
+        // result is false when we send real commit request
+        // use join for wait flight request done
         Boolean result = commitAsync().join();
         if (!result) {
             result = flightCommitRequest.join();
@@ -340,92 +327,156 @@ public abstract class TieredFileSegment implements 
Comparable<TieredFileSegment>
         return result;
     }
 
+    private void releaseCommitLock() {
+        if (commitLock.availablePermits() == 0) {
+            commitLock.release();
+        } else {
+            logger.error("[Bug] FileSegmentCommitAsync, lock is already 
released: available permits: {}",
+                commitLock.availablePermits());
+        }
+    }
+
+    private void updateDispatchCommitOffset(List<ByteBuffer> bufferList) {
+        if (fileType == FileSegmentType.COMMIT_LOG && bufferList.size() > 0) {
+            dispatchCommitOffset =
+                
MessageBufferUtil.getQueueOffset(bufferList.get(bufferList.size() - 1));
+        }
+    }
+
+    /**
+     * @return false: commit, true: no commit operation
+     */
     @SuppressWarnings("NonAtomicOperationOnVolatileField")
     public CompletableFuture<Boolean> commitAsync() {
         if (closed) {
             return CompletableFuture.completedFuture(false);
         }
-        Stopwatch stopwatch = Stopwatch.createStarted();
+
         if (!needCommit()) {
             return CompletableFuture.completedFuture(true);
         }
-        try {
-            int permits = commitLock.drainPermits();
-            if (permits <= 0) {
-                return CompletableFuture.completedFuture(false);
-            }
-        } catch (Exception e) {
+
+        if (commitLock.drainPermits() <= 0) {
             return CompletableFuture.completedFuture(false);
         }
-        List<ByteBuffer> bufferList = rollingUploadBuffer();
-        int bufferSize = 0;
-        for (ByteBuffer buffer : bufferList) {
-            bufferSize += buffer.remaining();
-        }
-        if (codaBuffer != null) {
-            bufferSize += codaBuffer.remaining();
-        }
-        if (bufferSize == 0) {
-            return CompletableFuture.completedFuture(true);
-        }
-        TieredFileSegmentInputStream inputStream = 
TieredFileSegmentInputStreamFactory.build(
-            fileType, baseOffset + commitPosition, bufferList, codaBuffer, 
bufferSize);
-        int finalBufferSize = bufferSize;
+
         try {
-            flightCommitRequest = commit0(inputStream, commitPosition, 
bufferSize, fileType != FileSegmentType.INDEX)
+            if (fileSegmentInputStream != null) {
+                long fileSize = this.getSize();
+                if (fileSize == -1L) {
+                    logger.error("Get commit position error before commit, 
Commit: %d, Expect: %d, Current Max: %d, FileName: %s",
+                        commitPosition, commitPosition + 
fileSegmentInputStream.getContentLength(), appendPosition, getPath());
+                    releaseCommitLock();
+                    return CompletableFuture.completedFuture(false);
+                } else {
+                    if (correctPosition(fileSize, null)) {
+                        
updateDispatchCommitOffset(fileSegmentInputStream.getBufferList());
+                        fileSegmentInputStream = null;
+                    }
+                }
+            }
+
+            int bufferSize;
+            if (fileSegmentInputStream != null) {
+                bufferSize = fileSegmentInputStream.available();
+            } else {
+                List<ByteBuffer> bufferList = borrowBuffer();
+                bufferSize = 
bufferList.stream().mapToInt(ByteBuffer::remaining).sum()
+                    + (codaBuffer != null ? codaBuffer.remaining() : 0);
+                if (bufferSize == 0) {
+                    releaseCommitLock();
+                    return CompletableFuture.completedFuture(true);
+                }
+                fileSegmentInputStream = FileSegmentInputStreamFactory.build(
+                    fileType, baseOffset + commitPosition, bufferList, 
codaBuffer, bufferSize);
+            }
+
+            return flightCommitRequest = this
+                .commit0(fileSegmentInputStream, commitPosition, bufferSize, 
fileType != FileSegmentType.INDEX)
                 .thenApply(result -> {
                     if (result) {
-                        if (fileType == FileSegmentType.COMMIT_LOG && 
bufferList.size() > 0) {
-                            dispatchCommitOffset = 
MessageBufferUtil.getQueueOffset(bufferList.get(bufferList.size() - 1));
-                        }
-                        commitPosition += finalBufferSize;
+                        
updateDispatchCommitOffset(fileSegmentInputStream.getBufferList());
+                        commitPosition += bufferSize;
+                        fileSegmentInputStream = null;
                         return true;
-                    }
-                    sendBackBuffer(inputStream);
-                    return false;
-                })
-                .exceptionally(e -> handleCommitException(inputStream, e))
-                .whenComplete((result, e) -> {
-                    if (commitLock.availablePermits() == 0) {
-                        logger.debug("TieredFileSegment#commitAsync: commit 
cost: {}ms, file: {}, item count: {}, buffer size: {}", 
stopwatch.elapsed(TimeUnit.MILLISECONDS), getPath(), bufferList.size(), 
finalBufferSize);
-                        commitLock.release();
                     } else {
-                        logger.error("[Bug]TieredFileSegment#commitAsync: 
commit lock is already released: available permits: {}", 
commitLock.availablePermits());
+                        fileSegmentInputStream.rewind();
+                        return false;
                     }
-                });
-            return flightCommitRequest;
+                })
+                .exceptionally(this::handleCommitException)
+                .whenComplete((result, e) -> releaseCommitLock());
+
         } catch (Exception e) {
-            handleCommitException(inputStream, e);
-            if (commitLock.availablePermits() == 0) {
-                logger.debug("TieredFileSegment#commitAsync: commit cost: 
{}ms, file: {}, item count: {}, buffer size: {}", 
stopwatch.elapsed(TimeUnit.MILLISECONDS), getPath(), bufferList.size(), 
finalBufferSize);
-                commitLock.release();
-            } else {
-                logger.error("[Bug]TieredFileSegment#commitAsync: commit lock 
is already released: available permits: {}", commitLock.availablePermits());
-            }
+            handleCommitException(e);
+            releaseCommitLock();
         }
         return CompletableFuture.completedFuture(false);
     }
 
-    private boolean handleCommitException(TieredFileSegmentInputStream 
inputStream, Throwable e) {
+    private long getCorrectFileSize(Throwable throwable) {
+        if (throwable instanceof TieredStoreException) {
+            long fileSize = ((TieredStoreException) throwable).getPosition();
+            if (fileSize > 0) {
+                return fileSize;
+            }
+        }
+        return getSize();
+    }
+
+    private boolean handleCommitException(Throwable e) {
+        // Get root cause here
         Throwable cause = e.getCause() != null ? e.getCause() : e;
-        sendBackBuffer(inputStream);
-        long realSize = 0;
-        if (cause instanceof TieredStoreException && ((TieredStoreException) 
cause).getPosition() > 0) {
-            realSize = ((TieredStoreException) cause).getPosition();
+        long fileSize = this.getCorrectFileSize(cause);
+
+        if (fileSize == -1L) {
+            logger.error("Get commit position error, Commit: %d, Expect: %d, 
Current Max: %d, FileName: %s",
+                commitPosition, commitPosition + 
fileSegmentInputStream.getContentLength(), appendPosition, getPath());
+            fileSegmentInputStream.rewind();
+            return false;
+        }
+
+        if (correctPosition(fileSize, cause)) {
+            updateDispatchCommitOffset(fileSegmentInputStream.getBufferList());
+            fileSegmentInputStream = null;
+            return true;
+        } else {
+            fileSegmentInputStream.rewind();
+            return false;
         }
-        if (realSize <= 0) {
-            realSize = getSize();
+    }
+
+    /**
+     * return true to clear buffer
+     */
+    private boolean correctPosition(long fileSize, Throwable throwable) {
+
+        // Current we have three offsets here: commit offset, expect offset, 
file size.
+        // We guarantee that the commit offset is less than or equal to the 
expect offset.
+        // Max offset will increase because we can continuously put in new 
buffers
+        String handleInfo = throwable == null ? "before commit" : "after 
commit";
+        long expectPosition = commitPosition + 
fileSegmentInputStream.getContentLength();
+
+        String offsetInfo = String.format("Correct Commit Position, %s, 
result=[{}], " +
+                "Commit: %d, Expect: %d, Current Max: %d, FileSize: %d, 
FileName: %s",
+            handleInfo, commitPosition, expectPosition, appendPosition, 
fileSize, this.getPath());
+
+        // We are believing that the file size returned by the server is 
correct,
+        // can reset the commit offset to the file size reported by the 
storage system.
+        if (fileSize == expectPosition) {
+            logger.info(offsetInfo, "Success", throwable);
+            commitPosition = fileSize;
+            return true;
         }
-        if (realSize > 0 && realSize > commitPosition) {
-            logger.error("TieredFileSegment#handleCommitException: commit 
failed: file: {}, try to fix position: origin: {}, real: {}", getPath(), 
commitPosition, realSize, cause);
-            // TODO check if this diff part is uploaded to backend storage
-            long diff = appendPosition - commitPosition;
-            commitPosition = realSize;
-            appendPosition = realSize + diff;
-            // TODO check if appendPosition is large than maxOffset
-        } else if (realSize < commitPosition) {
-            logger.error("[Bug]TieredFileSegment#handleCommitException: commit 
failed: file: {}, can not fix position: origin: {}, real: {}", getPath(), 
commitPosition, realSize, cause);
+
+        if (fileSize < commitPosition) {
+            logger.error(offsetInfo, "FileSizeIncorrect", throwable);
+        } else if (fileSize == commitPosition) {
+            logger.warn(offsetInfo, "CommitFailed", throwable);
+        } else if (fileSize > commitPosition) {
+            logger.warn(offsetInfo, "PartialSuccess", throwable);
         }
+        commitPosition = fileSize;
         return false;
     }
 }
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreProvider.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreProvider.java
index 5a0ca25f59..0db3eaf8f4 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreProvider.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreProvider.java
@@ -18,7 +18,7 @@ package org.apache.rocketmq.tieredstore.provider;
 
 import java.nio.ByteBuffer;
 import java.util.concurrent.CompletableFuture;
-import 
org.apache.rocketmq.tieredstore.provider.inputstream.TieredFileSegmentInputStream;
+import org.apache.rocketmq.tieredstore.provider.stream.FileSegmentInputStream;
 
 public interface TieredStoreProvider {
 
@@ -30,7 +30,9 @@ public interface TieredStoreProvider {
     String getPath();
 
     /**
-     * Get file size in backend file system
+     * Get the real length of the file.
+     * Return 0 if the file does not exist,
+     * Return -1 if system get size failed.
      *
      * @return file real size
      */
@@ -71,5 +73,5 @@ public interface TieredStoreProvider {
      * @param append try to append or create a new file
      * @return put result, <code>true</code> if data successfully write; 
<code>false</code> otherwise
      */
-    CompletableFuture<Boolean> commit0(TieredFileSegmentInputStream 
inputStream,long position, int length, boolean append);
+    CompletableFuture<Boolean> commit0(FileSegmentInputStream inputStream,long 
position, int length, boolean append);
 }
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java
index 52be90b1df..7e949cb28c 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java
@@ -36,7 +36,7 @@ import 
org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
 import org.apache.rocketmq.tieredstore.common.TieredStoreExecutor;
 import org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsManager;
 import org.apache.rocketmq.tieredstore.provider.TieredFileSegment;
-import 
org.apache.rocketmq.tieredstore.provider.inputstream.TieredFileSegmentInputStream;
+import org.apache.rocketmq.tieredstore.provider.stream.FileSegmentInputStream;
 import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
 
 import static 
org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsConstant.LABEL_FILE_TYPE;
@@ -184,7 +184,7 @@ public class PosixFileSegment extends TieredFileSegment {
 
     @Override
     public CompletableFuture<Boolean> commit0(
-        TieredFileSegmentInputStream inputStream, long position, int length, 
boolean append) {
+        FileSegmentInputStream inputStream, long position, int length, boolean 
append) {
 
         Stopwatch stopwatch = Stopwatch.createStarted();
         AttributesBuilder attributesBuilder = newAttributesBuilder()
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/inputstream/TieredCommitLogInputStream.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/stream/CommitLogInputStream.java
similarity index 88%
rename from 
tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/inputstream/TieredCommitLogInputStream.java
rename to 
tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/stream/CommitLogInputStream.java
index c70bb76562..13b6e0ef9c 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/inputstream/TieredCommitLogInputStream.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/stream/CommitLogInputStream.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.tieredstore.provider.inputstream;
+package org.apache.rocketmq.tieredstore.provider.stream;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -23,20 +23,23 @@ import java.util.List;
 import org.apache.rocketmq.tieredstore.common.FileSegmentType;
 import org.apache.rocketmq.tieredstore.util.MessageBufferUtil;
 
-public class TieredCommitLogInputStream extends TieredFileSegmentInputStream {
+public class CommitLogInputStream extends FileSegmentInputStream {
 
     /**
      * commitLogOffset is the real physical offset of the commitLog buffer 
which is being read
      */
+    private final long startCommitLogOffset;
+
     private long commitLogOffset;
 
     private final ByteBuffer codaBuffer;
 
     private long markCommitLogOffset = -1;
 
-    public TieredCommitLogInputStream(FileSegmentType fileType, long 
startOffset,
+    public CommitLogInputStream(FileSegmentType fileType, long startOffset,
         List<ByteBuffer> uploadBufferList, ByteBuffer codaBuffer, int 
contentLength) {
         super(fileType, uploadBufferList, contentLength);
+        this.startCommitLogOffset = startOffset;
         this.commitLogOffset = startOffset;
         this.codaBuffer = codaBuffer;
     }
@@ -53,6 +56,15 @@ public class TieredCommitLogInputStream extends 
TieredFileSegmentInputStream {
         this.commitLogOffset = markCommitLogOffset;
     }
 
+    @Override
+    public synchronized void rewind() {
+        super.rewind();
+        this.commitLogOffset = this.startCommitLogOffset;
+        if (this.codaBuffer != null) {
+            this.codaBuffer.rewind();
+        }
+    }
+
     @Override
     public ByteBuffer getCodaBuffer() {
         return this.codaBuffer;
@@ -64,17 +76,17 @@ public class TieredCommitLogInputStream extends 
TieredFileSegmentInputStream {
             return -1;
         }
         readPosition++;
-        if (curReadBufferIndex >= uploadBufferList.size()) {
+        if (curReadBufferIndex >= bufferList.size()) {
             return readCoda();
         }
         int res;
         if (readPosInCurBuffer >= curBuffer.remaining()) {
             curReadBufferIndex++;
-            if (curReadBufferIndex >= uploadBufferList.size()) {
+            if (curReadBufferIndex >= bufferList.size()) {
                 readPosInCurBuffer = 0;
                 return readCoda();
             }
-            curBuffer = uploadBufferList.get(curReadBufferIndex);
+            curBuffer = bufferList.get(curReadBufferIndex);
             commitLogOffset += readPosInCurBuffer;
             readPosInCurBuffer = 0;
         }
@@ -119,9 +131,9 @@ public class TieredCommitLogInputStream extends 
TieredFileSegmentInputStream {
         int posInCurBuffer = readPosInCurBuffer;
         long curCommitLogOffset = commitLogOffset;
         ByteBuffer curBuf = curBuffer;
-        while (needRead > 0 && bufIndex <= uploadBufferList.size()) {
+        while (needRead > 0 && bufIndex <= bufferList.size()) {
             int readLen, remaining, realReadLen = 0;
-            if (bufIndex == uploadBufferList.size()) {
+            if (bufIndex == bufferList.size()) {
                 // read from coda buffer
                 remaining = codaBuffer.remaining() - posInCurBuffer;
                 readLen = Math.min(remaining, needRead);
@@ -137,7 +149,7 @@ public class TieredCommitLogInputStream extends 
TieredFileSegmentInputStream {
             }
             remaining = curBuf.remaining() - posInCurBuffer;
             readLen = Math.min(remaining, needRead);
-            curBuf = uploadBufferList.get(bufIndex);
+            curBuf = bufferList.get(bufIndex);
             if (posInCurBuffer < MessageBufferUtil.PHYSICAL_OFFSET_POSITION) {
                 realReadLen = 
Math.min(MessageBufferUtil.PHYSICAL_OFFSET_POSITION - posInCurBuffer, readLen);
                 // read from commitLog buffer
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/inputstream/TieredFileSegmentInputStream.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/stream/FileSegmentInputStream.java
similarity index 77%
rename from 
tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/inputstream/TieredFileSegmentInputStream.java
rename to 
tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/stream/FileSegmentInputStream.java
index e1758ca934..9e9d5135cd 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/inputstream/TieredFileSegmentInputStream.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/stream/FileSegmentInputStream.java
@@ -15,15 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.tieredstore.provider.inputstream;
+package org.apache.rocketmq.tieredstore.provider.stream;
 
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.ByteBuffer;
 import java.util.List;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.rocketmq.tieredstore.common.FileSegmentType;
 
-public class TieredFileSegmentInputStream extends InputStream {
+public class FileSegmentInputStream extends InputStream {
 
     /**
      * file type, can be commitlog, consume queue or indexfile now
@@ -33,7 +34,7 @@ public class TieredFileSegmentInputStream extends InputStream 
{
     /**
      * hold bytebuffer
      */
-    protected final List<ByteBuffer> uploadBufferList;
+    protected final List<ByteBuffer> bufferList;
 
     /**
      * total remaining of bytebuffer list
@@ -65,13 +66,13 @@ public class TieredFileSegmentInputStream extends 
InputStream {
 
     private int markReadPosInCurBuffer = -1;
 
-    public TieredFileSegmentInputStream(FileSegmentType fileType, 
List<ByteBuffer> uploadBufferList,
-        int contentLength) {
+    public FileSegmentInputStream(
+        FileSegmentType fileType, List<ByteBuffer> bufferList, int 
contentLength) {
         this.fileType = fileType;
         this.contentLength = contentLength;
-        this.uploadBufferList = uploadBufferList;
-        if (uploadBufferList != null && uploadBufferList.size() > 0) {
-            this.curBuffer = uploadBufferList.get(curReadBufferIndex);
+        this.bufferList = bufferList;
+        if (bufferList != null && bufferList.size() > 0) {
+            this.curBuffer = bufferList.get(curReadBufferIndex);
         }
     }
 
@@ -95,18 +96,34 @@ public class TieredFileSegmentInputStream extends 
InputStream {
         this.readPosition = markReadPosition;
         this.curReadBufferIndex = markCurReadBufferIndex;
         this.readPosInCurBuffer = markReadPosInCurBuffer;
-        if (this.curReadBufferIndex < uploadBufferList.size()) {
-            this.curBuffer = uploadBufferList.get(curReadBufferIndex);
+        if (this.curReadBufferIndex < bufferList.size()) {
+            this.curBuffer = bufferList.get(curReadBufferIndex);
         }
     }
 
+    public synchronized void rewind() {
+        this.readPosition = 0;
+        this.curReadBufferIndex = 0;
+        this.readPosInCurBuffer = 0;
+        if (CollectionUtils.isNotEmpty(bufferList)) {
+            this.curBuffer = bufferList.get(0);
+            for (ByteBuffer buffer : bufferList) {
+                buffer.rewind();
+            }
+        }
+    }
+
+    public int getContentLength() {
+        return contentLength;
+    }
+
     @Override
     public int available() {
         return contentLength - readPosition;
     }
 
-    public List<ByteBuffer> getUploadBufferList() {
-        return uploadBufferList;
+    public List<ByteBuffer> getBufferList() {
+        return bufferList;
     }
 
     public ByteBuffer getCodaBuffer() {
@@ -121,10 +138,10 @@ public class TieredFileSegmentInputStream extends 
InputStream {
         readPosition++;
         if (readPosInCurBuffer >= curBuffer.remaining()) {
             curReadBufferIndex++;
-            if (curReadBufferIndex >= uploadBufferList.size()) {
+            if (curReadBufferIndex >= bufferList.size()) {
                 return -1;
             }
-            curBuffer = uploadBufferList.get(curReadBufferIndex);
+            curBuffer = bufferList.get(curReadBufferIndex);
             readPosInCurBuffer = 0;
         }
         return curBuffer.get(readPosInCurBuffer++) & 0xff;
@@ -153,8 +170,8 @@ public class TieredFileSegmentInputStream extends 
InputStream {
         int bufIndex = curReadBufferIndex;
         int posInCurBuffer = readPosInCurBuffer;
         ByteBuffer curBuf = curBuffer;
-        while (needRead > 0 && bufIndex < uploadBufferList.size()) {
-            curBuf = uploadBufferList.get(bufIndex);
+        while (needRead > 0 && bufIndex < bufferList.size()) {
+            curBuf = bufferList.get(bufIndex);
             int remaining = curBuf.remaining() - posInCurBuffer;
             int readLen = Math.min(remaining, needRead);
             // read from curBuf
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/inputstream/TieredFileSegmentInputStreamFactory.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/stream/FileSegmentInputStreamFactory.java
similarity index 54%
rename from 
tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/inputstream/TieredFileSegmentInputStreamFactory.java
rename to 
tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/stream/FileSegmentInputStreamFactory.java
index d0c983fd43..a90baff3ae 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/inputstream/TieredFileSegmentInputStreamFactory.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/stream/FileSegmentInputStreamFactory.java
@@ -15,30 +15,34 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.tieredstore.provider.inputstream;
+package org.apache.rocketmq.tieredstore.provider.stream;
 
 import java.nio.ByteBuffer;
 import java.util.List;
 import org.apache.rocketmq.tieredstore.common.FileSegmentType;
 
-public class TieredFileSegmentInputStreamFactory {
+public class FileSegmentInputStreamFactory {
 
-    public static TieredFileSegmentInputStream build(FileSegmentType fileType,
-        long startOffset, List<ByteBuffer> uploadBufferList, ByteBuffer 
codaBuffer, int contentLength) {
+    public static FileSegmentInputStream build(
+        FileSegmentType fileType, long offset, List<ByteBuffer> bufferList, 
ByteBuffer byteBuffer, int length) {
+
+        if (bufferList == null) {
+            throw new IllegalArgumentException("bufferList is null");
+        }
 
         switch (fileType) {
             case COMMIT_LOG:
-                return new TieredCommitLogInputStream(
-                    fileType, startOffset, uploadBufferList, codaBuffer, 
contentLength);
+                return new CommitLogInputStream(
+                    fileType, offset, bufferList, byteBuffer, length);
             case CONSUME_QUEUE:
-                return new TieredFileSegmentInputStream(fileType, 
uploadBufferList, contentLength);
+                return new FileSegmentInputStream(fileType, bufferList, 
length);
             case INDEX:
-                if (uploadBufferList.size() != 1) {
-                    throw new IllegalArgumentException("uploadBufferList size 
in INDEX type input stream must be 1");
+                if (bufferList.size() != 1) {
+                    throw new IllegalArgumentException("buffer block size must 
be 1 when file type is IndexFile");
                 }
-                return new TieredFileSegmentInputStream(fileType, 
uploadBufferList, contentLength);
+                return new FileSegmentInputStream(fileType, bufferList, 
length);
             default:
-                throw new IllegalArgumentException("fileType is not 
supported");
+                throw new IllegalArgumentException("file type is not 
supported");
         }
     }
 }
diff --git 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java
 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java
index 8601392e74..2451199c28 100644
--- 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java
+++ 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java
@@ -130,36 +130,36 @@ public class TieredMessageStoreTest {
         // TieredStorageLevel.DISABLE
         properties.setProperty("tieredStorageLevel", "0");
         configuration.update(properties);
-        Assert.assertFalse(store.viaTieredStorage(mq.getTopic(), 
mq.getQueueId(), 0));
+        Assert.assertFalse(store.fetchFromCurrentStore(mq.getTopic(), 
mq.getQueueId(), 0));
 
         // TieredStorageLevel.NOT_IN_DISK
         properties.setProperty("tieredStorageLevel", "1");
         configuration.update(properties);
         when(nextStore.checkInStoreByConsumeOffset(anyString(), anyInt(), 
anyLong())).thenReturn(false);
-        Assert.assertTrue(store.viaTieredStorage(mq.getTopic(), 
mq.getQueueId(), 0));
+        Assert.assertTrue(store.fetchFromCurrentStore(mq.getTopic(), 
mq.getQueueId(), 0));
 
         when(nextStore.checkInStoreByConsumeOffset(anyString(), anyInt(), 
anyLong())).thenReturn(true);
-        Assert.assertFalse(store.viaTieredStorage(mq.getTopic(), 
mq.getQueueId(), 0));
+        Assert.assertFalse(store.fetchFromCurrentStore(mq.getTopic(), 
mq.getQueueId(), 0));
 
         // TieredStorageLevel.NOT_IN_MEM
         properties.setProperty("tieredStorageLevel", "2");
         configuration.update(properties);
         Mockito.when(nextStore.checkInStoreByConsumeOffset(anyString(), 
anyInt(), anyLong())).thenReturn(false);
         Mockito.when(nextStore.checkInMemByConsumeOffset(anyString(), 
anyInt(), anyLong(), anyInt())).thenReturn(true);
-        Assert.assertTrue(store.viaTieredStorage(mq.getTopic(), 
mq.getQueueId(), 0));
+        Assert.assertTrue(store.fetchFromCurrentStore(mq.getTopic(), 
mq.getQueueId(), 0));
 
         Mockito.when(nextStore.checkInStoreByConsumeOffset(anyString(), 
anyInt(), anyLong())).thenReturn(true);
         Mockito.when(nextStore.checkInMemByConsumeOffset(anyString(), 
anyInt(), anyLong(), anyInt())).thenReturn(false);
-        Assert.assertTrue(store.viaTieredStorage(mq.getTopic(), 
mq.getQueueId(), 0));
+        Assert.assertTrue(store.fetchFromCurrentStore(mq.getTopic(), 
mq.getQueueId(), 0));
 
         Mockito.when(nextStore.checkInStoreByConsumeOffset(anyString(), 
anyInt(), anyLong())).thenReturn(true);
         Mockito.when(nextStore.checkInMemByConsumeOffset(anyString(), 
anyInt(), anyLong(), anyInt())).thenReturn(true);
-        Assert.assertFalse(store.viaTieredStorage(mq.getTopic(), 
mq.getQueueId(), 0));
+        Assert.assertFalse(store.fetchFromCurrentStore(mq.getTopic(), 
mq.getQueueId(), 0));
 
         // TieredStorageLevel.FORCE
         properties.setProperty("tieredStorageLevel", "3");
         configuration.update(properties);
-        Assert.assertTrue(store.viaTieredStorage(mq.getTopic(), 
mq.getQueueId(), 0));
+        Assert.assertTrue(store.fetchFromCurrentStore(mq.getTopic(), 
mq.getQueueId(), 0));
     }
 
     @Test
diff --git 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileTest.java
 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileTest.java
index cc39cfbfce..7a4d059690 100644
--- 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileTest.java
+++ 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileTest.java
@@ -24,6 +24,7 @@ import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.tieredstore.TieredStoreTestUtil;
 import org.apache.rocketmq.tieredstore.common.FileSegmentType;
 import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
+import org.apache.rocketmq.tieredstore.common.TieredStoreExecutor;
 import org.apache.rocketmq.tieredstore.metadata.FileSegmentMetadata;
 import org.apache.rocketmq.tieredstore.metadata.TieredMetadataStore;
 import org.apache.rocketmq.tieredstore.provider.TieredFileSegment;
@@ -55,6 +56,7 @@ public class TieredFlatFileTest {
     public void tearDown() throws IOException {
         TieredStoreTestUtil.destroyMetadataStore();
         TieredStoreTestUtil.destroyTempDir(storePath);
+        TieredStoreExecutor.shutdown();
     }
 
     private List<FileSegmentMetadata> 
getSegmentMetadataList(TieredMetadataStore metadataStore) {
diff --git 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredIndexFileTest.java
 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredIndexFileTest.java
index 262d6645b3..2da72bc7a7 100644
--- 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredIndexFileTest.java
+++ 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredIndexFileTest.java
@@ -87,5 +87,7 @@ public class TieredIndexFileTest {
 
         indexList = indexFile.queryAsync(mq.getTopic(), "key1", 1000, 
1200).join();
         Assert.assertEquals(1, indexList.size());
+
+        indexFile.destroy();
     }
 }
diff --git 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/MockTieredFileSegmentInputStream.java
 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/MockFileSegmentInputStream.java
similarity index 82%
rename from 
tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/MockTieredFileSegmentInputStream.java
rename to 
tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/MockFileSegmentInputStream.java
index a6566b7de5..3bbe41dd4b 100644
--- 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/MockTieredFileSegmentInputStream.java
+++ 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/MockFileSegmentInputStream.java
@@ -20,13 +20,13 @@ package org.apache.rocketmq.tieredstore.provider;
 import java.io.InputStream;
 import java.nio.ByteBuffer;
 import java.util.List;
-import 
org.apache.rocketmq.tieredstore.provider.inputstream.TieredFileSegmentInputStream;
+import org.apache.rocketmq.tieredstore.provider.stream.FileSegmentInputStream;
 
-public class MockTieredFileSegmentInputStream extends 
TieredFileSegmentInputStream {
+public class MockFileSegmentInputStream extends FileSegmentInputStream {
 
     private final InputStream inputStream;
 
-    public MockTieredFileSegmentInputStream(InputStream inputStream) {
+    public MockFileSegmentInputStream(InputStream inputStream) {
         super(null, null, Integer.MAX_VALUE);
         this.inputStream = inputStream;
     }
@@ -43,7 +43,7 @@ public class MockTieredFileSegmentInputStream extends 
TieredFileSegmentInputStre
     }
 
     @Override
-    public List<ByteBuffer> getUploadBufferList() {
+    public List<ByteBuffer> getBufferList() {
         return null;
     }
 
diff --git 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegmentInputStreamTest.java
 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegmentInputStreamTest.java
index a2554ba3d1..743d9182ce 100644
--- 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegmentInputStreamTest.java
+++ 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegmentInputStreamTest.java
@@ -28,8 +28,8 @@ import java.util.Random;
 import org.apache.rocketmq.tieredstore.common.FileSegmentType;
 import org.apache.rocketmq.tieredstore.file.TieredCommitLog;
 import org.apache.rocketmq.tieredstore.file.TieredConsumeQueue;
-import 
org.apache.rocketmq.tieredstore.provider.inputstream.TieredFileSegmentInputStream;
-import 
org.apache.rocketmq.tieredstore.provider.inputstream.TieredFileSegmentInputStreamFactory;
+import org.apache.rocketmq.tieredstore.provider.stream.FileSegmentInputStream;
+import 
org.apache.rocketmq.tieredstore.provider.stream.FileSegmentInputStreamFactory;
 import org.apache.rocketmq.tieredstore.util.MessageBufferUtil;
 import org.apache.rocketmq.tieredstore.util.MessageBufferUtilTest;
 import org.junit.Assert;
@@ -57,7 +57,7 @@ public class TieredFileSegmentInputStreamTest {
             bufferSize += byteBuffer.remaining();
         }
 
-        // build expected byte buffer for verifying the 
TieredFileSegmentInputStream
+        // build expected byte buffer for verifying the FileSegmentInputStream
         ByteBuffer expectedByteBuffer = ByteBuffer.allocate(bufferSize);
         for (ByteBuffer byteBuffer : uploadBufferList) {
             expectedByteBuffer.put(byteBuffer);
@@ -74,7 +74,7 @@ public class TieredFileSegmentInputStreamTest {
         int[] batchReadSizeTestSet = {
             MessageBufferUtil.PHYSICAL_OFFSET_POSITION - 1, 
MessageBufferUtil.PHYSICAL_OFFSET_POSITION, 
MessageBufferUtil.PHYSICAL_OFFSET_POSITION + 1, MSG_LEN - 1, MSG_LEN, MSG_LEN + 
1
         };
-        verifyReadAndReset(expectedByteBuffer, () -> 
TieredFileSegmentInputStreamFactory.build(
+        verifyReadAndReset(expectedByteBuffer, () -> 
FileSegmentInputStreamFactory.build(
             FileSegmentType.COMMIT_LOG, COMMIT_LOG_START_OFFSET, 
uploadBufferList, null, finalBufferSize), finalBufferSize, 
batchReadSizeTestSet);
 
     }
@@ -98,7 +98,7 @@ public class TieredFileSegmentInputStreamTest {
         int codaBufferSize = codaBuffer.remaining();
         bufferSize += codaBufferSize;
 
-        // build expected byte buffer for verifying the 
TieredFileSegmentInputStream
+        // build expected byte buffer for verifying the FileSegmentInputStream
         ByteBuffer expectedByteBuffer = ByteBuffer.allocate(bufferSize);
         for (ByteBuffer byteBuffer : uploadBufferList) {
             expectedByteBuffer.put(byteBuffer);
@@ -119,7 +119,7 @@ public class TieredFileSegmentInputStreamTest {
             MSG_LEN - 1, MSG_LEN, MSG_LEN + 1,
             bufferSize - 1, bufferSize, bufferSize + 1
         };
-        verifyReadAndReset(expectedByteBuffer, () -> 
TieredFileSegmentInputStreamFactory.build(
+        verifyReadAndReset(expectedByteBuffer, () -> 
FileSegmentInputStreamFactory.build(
             FileSegmentType.COMMIT_LOG, COMMIT_LOG_START_OFFSET, 
uploadBufferList, codaBuffer, finalBufferSize), finalBufferSize, 
batchReadSizeTestSet);
 
     }
@@ -134,7 +134,7 @@ public class TieredFileSegmentInputStreamTest {
             bufferSize += byteBuffer.remaining();
         }
 
-        // build expected byte buffer for verifying the 
TieredFileSegmentInputStream
+        // build expected byte buffer for verifying the FileSegmentInputStream
         ByteBuffer expectedByteBuffer = ByteBuffer.allocate(bufferSize);
         for (ByteBuffer byteBuffer : uploadBufferList) {
             expectedByteBuffer.put(byteBuffer);
@@ -143,7 +143,7 @@ public class TieredFileSegmentInputStreamTest {
 
         int finalBufferSize = bufferSize;
         int[] batchReadSizeTestSet = 
{TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE - 1, 
TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE, 
TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE + 1};
-        verifyReadAndReset(expectedByteBuffer, () -> 
TieredFileSegmentInputStreamFactory.build(
+        verifyReadAndReset(expectedByteBuffer, () -> 
FileSegmentInputStreamFactory.build(
             FileSegmentType.CONSUME_QUEUE, COMMIT_LOG_START_OFFSET, 
uploadBufferList, null, finalBufferSize), bufferSize, batchReadSizeTestSet);
     }
 
@@ -156,16 +156,16 @@ public class TieredFileSegmentInputStreamTest {
         byteBuffer.flip();
         List<ByteBuffer> uploadBufferList = Arrays.asList(byteBuffer);
 
-        // build expected byte buffer for verifying the 
TieredFileSegmentInputStream
+        // build expected byte buffer for verifying the FileSegmentInputStream
         ByteBuffer expectedByteBuffer = byteBuffer.slice();
 
-        verifyReadAndReset(expectedByteBuffer, () -> 
TieredFileSegmentInputStreamFactory.build(
+        verifyReadAndReset(expectedByteBuffer, () -> 
FileSegmentInputStreamFactory.build(
             FileSegmentType.INDEX, COMMIT_LOG_START_OFFSET, uploadBufferList, 
null, byteBuffer.limit()), byteBuffer.limit(), new int[] {23, 24, 25});
     }
 
-    private void verifyReadAndReset(ByteBuffer expectedByteBuffer, 
Supplier<TieredFileSegmentInputStream> constructor,
+    private void verifyReadAndReset(ByteBuffer expectedByteBuffer, 
Supplier<FileSegmentInputStream> constructor,
         int bufferSize, int[] readBatchSizeTestSet) {
-        TieredFileSegmentInputStream inputStream = constructor.get();
+        FileSegmentInputStream inputStream = constructor.get();
 
         // verify
         verifyInputStream(inputStream, expectedByteBuffer);
diff --git 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegmentTest.java
 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegmentTest.java
index 4cd83e0d26..a655710a50 100644
--- 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegmentTest.java
+++ 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegmentTest.java
@@ -116,13 +116,22 @@ public class TieredFileSegmentTest {
     }
 
     @Test
-    public void testCommitFailed() {
+    public void testCommitFailedThenSuccess() {
         long startTime = System.currentTimeMillis();
         MemoryFileSegment segment = (MemoryFileSegment) 
createFileSegment(FileSegmentType.COMMIT_LOG);
         long lastSize = segment.getSize();
-        segment.append(MessageBufferUtilTest.buildMockedMessageBuffer(), 0);
-        segment.append(MessageBufferUtilTest.buildMockedMessageBuffer(), 0);
-
+        segment.setCheckSize(false);
+        segment.initPosition(lastSize);
+        segment.setSize((int) lastSize);
+
+        ByteBuffer buffer1 = 
MessageBufferUtilTest.buildMockedMessageBuffer().putLong(
+            MessageBufferUtil.PHYSICAL_OFFSET_POSITION, baseOffset + lastSize);
+        ByteBuffer buffer2 = 
MessageBufferUtilTest.buildMockedMessageBuffer().putLong(
+            MessageBufferUtil.PHYSICAL_OFFSET_POSITION, baseOffset + lastSize 
+ MessageBufferUtilTest.MSG_LEN);
+        segment.append(buffer1, 0);
+        segment.append(buffer2, 0);
+
+        // Mock new message arrive
         segment.blocker = new CompletableFuture<>();
         new Thread(() -> {
             try {
@@ -131,20 +140,88 @@ public class TieredFileSegmentTest {
                 Assert.fail(e.getMessage());
             }
             ByteBuffer buffer = 
MessageBufferUtilTest.buildMockedMessageBuffer();
+            buffer.putLong(MessageBufferUtil.PHYSICAL_OFFSET_POSITION, 
MessageBufferUtilTest.MSG_LEN * 2);
             buffer.putLong(MessageBufferUtil.STORE_TIMESTAMP_POSITION, 
startTime);
             segment.append(buffer, 0);
             segment.blocker.complete(false);
         }).start();
 
+        // Commit failed
         segment.commit();
         segment.blocker.join();
+        segment.blocker = null;
+
+        // Copy data and assume commit success
+        segment.getMemStore().put(buffer1);
+        segment.getMemStore().put(buffer2);
+        segment.setSize((int) (lastSize + MessageBufferUtilTest.MSG_LEN * 2));
 
-        segment.blocker = new CompletableFuture<>();
-        segment.blocker.complete(true);
         segment.commit();
+        Assert.assertEquals(lastSize + MessageBufferUtilTest.MSG_LEN * 3, 
segment.getCommitPosition());
+        Assert.assertEquals(baseOffset + lastSize + 
MessageBufferUtilTest.MSG_LEN * 3, segment.getCommitOffset());
+        Assert.assertEquals(baseOffset + lastSize + 
MessageBufferUtilTest.MSG_LEN * 3, segment.getMaxOffset());
+
+        ByteBuffer msg1 = segment.read(lastSize, 
MessageBufferUtilTest.MSG_LEN);
+        Assert.assertEquals(baseOffset + lastSize, 
MessageBufferUtil.getCommitLogOffset(msg1));
+
+        ByteBuffer msg2 = segment.read(lastSize + 
MessageBufferUtilTest.MSG_LEN, MessageBufferUtilTest.MSG_LEN);
+        Assert.assertEquals(baseOffset + lastSize + 
MessageBufferUtilTest.MSG_LEN, MessageBufferUtil.getCommitLogOffset(msg2));
+
+        ByteBuffer msg3 = segment.read(lastSize + 
MessageBufferUtilTest.MSG_LEN * 2, MessageBufferUtilTest.MSG_LEN);
+        Assert.assertEquals(baseOffset + lastSize + 
MessageBufferUtilTest.MSG_LEN * 2, MessageBufferUtil.getCommitLogOffset(msg3));
+    }
+
+    @Test
+    public void testCommitFailed3Times() {
+        long startTime = System.currentTimeMillis();
+        MemoryFileSegment segment = (MemoryFileSegment) 
createFileSegment(FileSegmentType.COMMIT_LOG);
+        long lastSize = segment.getSize();
+        segment.setCheckSize(false);
+        segment.initPosition(lastSize);
+        segment.setSize((int) lastSize);
+
+        ByteBuffer buffer1 = 
MessageBufferUtilTest.buildMockedMessageBuffer().putLong(
+            MessageBufferUtil.PHYSICAL_OFFSET_POSITION, baseOffset + lastSize);
+        ByteBuffer buffer2 = 
MessageBufferUtilTest.buildMockedMessageBuffer().putLong(
+            MessageBufferUtil.PHYSICAL_OFFSET_POSITION, baseOffset + lastSize 
+ MessageBufferUtilTest.MSG_LEN);
+        segment.append(buffer1, 0);
+        segment.append(buffer2, 0);
+
+        // Mock new message arrive
+        segment.blocker = new CompletableFuture<>();
+        new Thread(() -> {
+            try {
+                Thread.sleep(3000);
+            } catch (InterruptedException e) {
+                Assert.fail(e.getMessage());
+            }
+            ByteBuffer buffer = 
MessageBufferUtilTest.buildMockedMessageBuffer();
+            buffer.putLong(MessageBufferUtil.PHYSICAL_OFFSET_POSITION, 
MessageBufferUtilTest.MSG_LEN * 2);
+            buffer.putLong(MessageBufferUtil.STORE_TIMESTAMP_POSITION, 
startTime);
+            segment.append(buffer, 0);
+            segment.blocker.complete(false);
+        }).start();
+
+        for (int i = 0; i < 3; i++) {
+            segment.commit();
+        }
+
+        Assert.assertEquals(lastSize, segment.getCommitPosition());
+        Assert.assertEquals(baseOffset + lastSize, segment.getCommitOffset());
+        Assert.assertEquals(baseOffset + lastSize + 
MessageBufferUtilTest.MSG_LEN * 3, segment.getMaxOffset());
+
+        segment.blocker.join();
+        segment.blocker = null;
 
+        segment.commit();
+        Assert.assertEquals(lastSize + MessageBufferUtilTest.MSG_LEN * 2, 
segment.getCommitPosition());
+        Assert.assertEquals(baseOffset + lastSize + 
MessageBufferUtilTest.MSG_LEN * 2, segment.getCommitOffset());
         Assert.assertEquals(baseOffset + lastSize + 
MessageBufferUtilTest.MSG_LEN * 3, segment.getMaxOffset());
+
+        segment.commit();
+        Assert.assertEquals(lastSize + MessageBufferUtilTest.MSG_LEN * 3, 
segment.getCommitPosition());
         Assert.assertEquals(baseOffset + lastSize + 
MessageBufferUtilTest.MSG_LEN * 3, segment.getCommitOffset());
+        Assert.assertEquals(baseOffset + lastSize + 
MessageBufferUtilTest.MSG_LEN * 3, segment.getMaxOffset());
 
         ByteBuffer msg1 = segment.read(lastSize, 
MessageBufferUtilTest.MSG_LEN);
         Assert.assertEquals(baseOffset + lastSize, 
MessageBufferUtil.getCommitLogOffset(msg1));
diff --git 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/memory/MemoryFileSegment.java
 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/memory/MemoryFileSegment.java
index cb155cf8f7..80ad41f685 100644
--- 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/memory/MemoryFileSegment.java
+++ 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/memory/MemoryFileSegment.java
@@ -23,7 +23,7 @@ import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.tieredstore.common.FileSegmentType;
 import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
 import org.apache.rocketmq.tieredstore.provider.TieredFileSegment;
-import 
org.apache.rocketmq.tieredstore.provider.inputstream.TieredFileSegmentInputStream;
+import org.apache.rocketmq.tieredstore.provider.stream.FileSegmentInputStream;
 import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
 import org.junit.Assert;
 
@@ -33,6 +33,8 @@ public class MemoryFileSegment extends TieredFileSegment {
 
     public CompletableFuture<Boolean> blocker;
 
+    protected int size = 0;
+
     protected boolean checkSize = true;
 
     public MemoryFileSegment(FileSegmentType fileType, MessageQueue 
messageQueue, long baseOffset,
@@ -56,6 +58,18 @@ public class MemoryFileSegment extends TieredFileSegment {
         memStore.position((int) getSize());
     }
 
+    public boolean isCheckSize() {
+        return checkSize;
+    }
+
+    public void setCheckSize(boolean checkSize) {
+        this.checkSize = checkSize;
+    }
+
+    public ByteBuffer getMemStore() {
+        return memStore;
+    }
+
     @Override
     public String getPath() {
         return filePath;
@@ -66,7 +80,11 @@ public class MemoryFileSegment extends TieredFileSegment {
         if (checkSize) {
             return 1000;
         }
-        return 0;
+        return size;
+    }
+
+    public void setSize(int size) {
+        this.size = size;
     }
 
     @Override
@@ -85,11 +103,11 @@ public class MemoryFileSegment extends TieredFileSegment {
 
     @Override
     public CompletableFuture<Boolean> commit0(
-        TieredFileSegmentInputStream inputStream, long position, int length, 
boolean append) {
+        FileSegmentInputStream inputStream, long position, int length, boolean 
append) {
 
         try {
             if (blocker != null && !blocker.get()) {
-                throw new IllegalStateException();
+                throw new IllegalStateException("Commit Exception for Memory 
Test");
             }
         } catch (InterruptedException | ExecutionException e) {
             Assert.fail(e.getMessage());
@@ -98,7 +116,6 @@ public class MemoryFileSegment extends TieredFileSegment {
         Assert.assertTrue(!checkSize || position >= getSize());
 
         byte[] buffer = new byte[1024];
-
         int startPos = memStore.position();
         try {
             int len;
diff --git 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/memory/MemoryFileSegmentWithoutCheck.java
 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/memory/MemoryFileSegmentWithoutCheck.java
index 8ac330b370..630fd22236 100644
--- 
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/memory/MemoryFileSegmentWithoutCheck.java
+++ 
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/memory/MemoryFileSegmentWithoutCheck.java
@@ -22,7 +22,7 @@ import java.util.concurrent.ExecutionException;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.tieredstore.common.FileSegmentType;
 import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
-import 
org.apache.rocketmq.tieredstore.provider.inputstream.TieredFileSegmentInputStream;
+import org.apache.rocketmq.tieredstore.provider.stream.FileSegmentInputStream;
 import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
 import org.junit.Assert;
 
@@ -46,7 +46,7 @@ public class MemoryFileSegmentWithoutCheck extends 
MemoryFileSegment {
     }
 
     @Override
-    public CompletableFuture<Boolean> commit0(TieredFileSegmentInputStream 
inputStream, long position, int length,
+    public CompletableFuture<Boolean> commit0(FileSegmentInputStream 
inputStream, long position, int length,
         boolean append) {
         try {
             if (blocker != null && !blocker.get()) {

Reply via email to