This is an automated email from the ASF dual-hosted git repository. jinrongtong pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new c78061bf6c [ISSUE#7280] Fix and refactor handle commit exception in tiered storage (#7281) c78061bf6c is described below commit c78061bf6ca5f35452510ec4107c46735c51c316 Author: lizhimins <707364...@qq.com> AuthorDate: Wed Aug 30 22:29:51 2023 +0800 [ISSUE#7280] Fix and refactor handle commit exception in tiered storage (#7281) * refactor handle commit exception * refactor handle commit exception * fix handle commit exception --- .../rocketmq/tieredstore/TieredDispatcher.java | 3 +- .../rocketmq/tieredstore/TieredMessageFetcher.java | 57 ++-- .../rocketmq/tieredstore/TieredMessageStore.java | 26 +- .../tieredstore/provider/TieredFileSegment.java | 291 ++++++++++++--------- .../tieredstore/provider/TieredStoreProvider.java | 8 +- .../provider/posix/PosixFileSegment.java | 4 +- .../CommitLogInputStream.java} | 30 ++- .../FileSegmentInputStream.java} | 49 ++-- .../FileSegmentInputStreamFactory.java} | 26 +- .../tieredstore/TieredMessageStoreTest.java | 14 +- .../tieredstore/file/TieredFlatFileTest.java | 2 + .../tieredstore/file/TieredIndexFileTest.java | 2 + ...Stream.java => MockFileSegmentInputStream.java} | 8 +- .../provider/TieredFileSegmentInputStreamTest.java | 24 +- .../provider/TieredFileSegmentTest.java | 89 ++++++- .../provider/memory/MemoryFileSegment.java | 27 +- .../memory/MemoryFileSegmentWithoutCheck.java | 4 +- 17 files changed, 427 insertions(+), 237 deletions(-) diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java index 1746190cdb..430c2b62eb 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java @@ -318,8 +318,7 @@ public class TieredDispatcher extends ServiceThread implements CommitLogDispatch continue; case FILE_CLOSED: tieredFlatFileManager.destroyCompositeFile(flatFile.getMessageQueue()); - logger.info("TieredDispatcher#dispatchFlatFile: file has been close and destroy, " + - "topic: {}, queueId: {}", topic, queueId); + logger.info("File has been closed and destroy, topic: {}, queueId: {}", topic, queueId); return; default: dispatchOffset--; diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java index 9a9a3e5a5c..766ff64f6c 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java @@ -273,15 +273,17 @@ public class TieredMessageFetcher implements MessageStoreFetcher { TieredStoreMetricsManager.cacheHit.add(resultWrapperList.size(), attributes); } - // if no cached message found and there is currently an inflight request, wait for the request to end before continuing + // If there are no messages in the cache and there are currently requests being pulled. + // We need to wait for the request to return before continuing. if (resultWrapperList.isEmpty() && waitInflightRequest) { - CompletableFuture<Long> future = flatFile.getInflightRequest(group, queueOffset, maxCount) - .getFuture(queueOffset); + CompletableFuture<Long> future = + flatFile.getInflightRequest(group, queueOffset, maxCount).getFuture(queueOffset); if (!future.isDone()) { Stopwatch stopwatch = Stopwatch.createStarted(); // to prevent starvation issues, only allow waiting for inflight request once return future.thenCompose(v -> { - LOGGER.debug("TieredMessageFetcher#getMessageFromCacheAsync: wait for inflight request cost: {}ms", stopwatch.elapsed(TimeUnit.MILLISECONDS)); + LOGGER.debug("MessageFetcher#getMessageFromCacheAsync: wait for response cost: {}ms", + stopwatch.elapsed(TimeUnit.MILLISECONDS)); return getMessageFromCacheAsync(flatFile, group, queueOffset, maxCount, false); }); } @@ -302,7 +304,8 @@ public class TieredMessageFetcher implements MessageStoreFetcher { // if cache hit, result will be returned immediately and asynchronously prefetch messages for later requests if (!resultWrapperList.isEmpty()) { - LOGGER.debug("TieredMessageFetcher#getMessageFromCacheAsync: cache hit: topic: {}, queue: {}, queue offset: {}, max message num: {}, cache hit num: {}", + LOGGER.debug("MessageFetcher#getMessageFromCacheAsync: cache hit: " + + "topic: {}, queue: {}, queue offset: {}, max message num: {}, cache hit num: {}", mq.getTopic(), mq.getQueueId(), queueOffset, maxCount, resultWrapperList.size()); prefetchMessage(flatFile, group, maxCount, lastGetOffset + 1); @@ -316,8 +319,10 @@ public class TieredMessageFetcher implements MessageStoreFetcher { } // if cache is miss, immediately pull messages - LOGGER.warn("TieredMessageFetcher#getMessageFromCacheAsync: cache miss: topic: {}, queue: {}, queue offset: {}, max message num: {}", + LOGGER.warn("TieredMessageFetcher#getMessageFromCacheAsync: cache miss: " + + "topic: {}, queue: {}, queue offset: {}, max message num: {}", mq.getTopic(), mq.getQueueId(), queueOffset, maxCount); + CompletableFuture<GetMessageResult> resultFuture; synchronized (flatFile) { int batchSize = maxCount * storeConfig.getReadAheadMinFactor(); @@ -453,42 +458,42 @@ public class TieredMessageFetcher implements MessageStoreFetcher { public CompletableFuture<GetMessageResult> getMessageAsync( String group, String topic, int queueId, long queueOffset, int maxCount, final MessageFilter messageFilter) { + GetMessageResult result = new GetMessageResult(); CompositeQueueFlatFile flatFile = flatFileManager.getFlatFile(new MessageQueue(topic, brokerName, queueId)); + if (flatFile == null) { - GetMessageResult result = new GetMessageResult(); result.setNextBeginOffset(queueOffset); result.setStatus(GetMessageStatus.NO_MATCHED_LOGIC_QUEUE); return CompletableFuture.completedFuture(result); } - GetMessageResult result = new GetMessageResult(); - long minQueueOffset = flatFile.getConsumeQueueMinOffset(); - long maxQueueOffset = flatFile.getConsumeQueueCommitOffset(); - result.setMinOffset(minQueueOffset); - result.setMaxOffset(maxQueueOffset); + // Max queue offset means next message put position + result.setMinOffset(flatFile.getConsumeQueueMinOffset()); + result.setMaxOffset(flatFile.getConsumeQueueCommitOffset()); + + // Fill result according file offset. + // Offset range | Result | Fix to + // (-oo, 0] | no message | current offset + // (0, min) | too small | min offset + // [min, max) | correct | + // [max, max] | overflow one | max offset + // (max, +oo) | overflow badly | max offset - if (flatFile.getConsumeQueueCommitOffset() <= 0) { + if (result.getMaxOffset() <= 0) { result.setStatus(GetMessageStatus.NO_MESSAGE_IN_QUEUE); result.setNextBeginOffset(queueOffset); return CompletableFuture.completedFuture(result); - } - - // request range | result - // (0, min) | too small - // [min, max) | correct - // [max, max] | overflow one - // (max, +oo) | overflow badly - if (queueOffset < minQueueOffset) { + } else if (queueOffset < result.getMinOffset()) { result.setStatus(GetMessageStatus.OFFSET_TOO_SMALL); - result.setNextBeginOffset(flatFile.getConsumeQueueMinOffset()); + result.setNextBeginOffset(result.getMinOffset()); return CompletableFuture.completedFuture(result); - } else if (queueOffset == maxQueueOffset) { + } else if (queueOffset == result.getMaxOffset()) { result.setStatus(GetMessageStatus.OFFSET_OVERFLOW_ONE); - result.setNextBeginOffset(flatFile.getConsumeQueueCommitOffset()); + result.setNextBeginOffset(result.getMaxOffset()); return CompletableFuture.completedFuture(result); - } else if (queueOffset > maxQueueOffset) { + } else if (queueOffset > result.getMaxOffset()) { result.setStatus(GetMessageStatus.OFFSET_OVERFLOW_BADLY); - result.setNextBeginOffset(flatFile.getConsumeQueueCommitOffset()); + result.setNextBeginOffset(result.getMaxOffset()); return CompletableFuture.completedFuture(result); } diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java index 5240ac8e9e..78e855f365 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java @@ -99,11 +99,11 @@ public class TieredMessageStore extends AbstractPluginMessageStore { return storeConfig; } - public boolean viaTieredStorage(String topic, int queueId, long offset) { - return viaTieredStorage(topic, queueId, offset, 1); + public boolean fetchFromCurrentStore(String topic, int queueId, long offset) { + return fetchFromCurrentStore(topic, queueId, offset, 1); } - public boolean viaTieredStorage(String topic, int queueId, long offset, int batchSize) { + public boolean fetchFromCurrentStore(String topic, int queueId, long offset, int batchSize) { TieredMessageStoreConfig.TieredStorageLevel deepStorageLevel = storeConfig.getTieredStorageLevel(); if (deepStorageLevel.check(TieredMessageStoreConfig.TieredStorageLevel.FORCE)) { @@ -146,8 +146,10 @@ public class TieredMessageStore extends AbstractPluginMessageStore { public CompletableFuture<GetMessageResult> getMessageAsync(String group, String topic, int queueId, long offset, int maxMsgNums, MessageFilter messageFilter) { - if (!viaTieredStorage(topic, queueId, offset, maxMsgNums)) { - logger.trace("GetMessageAsync from next store topic: {}, queue: {}, offset: {}", topic, queueId, offset); + if (fetchFromCurrentStore(topic, queueId, offset, maxMsgNums)) { + logger.trace("GetMessageAsync from current store, topic: {}, queue: {}, offset: {}", topic, queueId, offset); + } else { + logger.trace("GetMessageAsync from next store, topic: {}, queue: {}, offset: {}", topic, queueId, offset); return next.getMessageAsync(group, topic, queueId, offset, maxMsgNums, messageFilter); } @@ -168,14 +170,14 @@ public class TieredMessageStore extends AbstractPluginMessageStore { if (next.checkInStoreByConsumeOffset(topic, queueId, offset)) { TieredStoreMetricsManager.fallbackTotal.add(1, latencyAttributes); - logger.debug("GetMessageAsync not found then try back to next store, result: {}, " + + logger.debug("GetMessageAsync not found, then back to next store, result: {}, " + "topic: {}, queue: {}, queue offset: {}, offset range: {}-{}", result.getStatus(), topic, queueId, offset, result.getMinOffset(), result.getMaxOffset()); return next.getMessage(group, topic, queueId, offset, maxMsgNums, messageFilter); } } - // system topic + // Fetch system topic data from the broker when using the force level. if (result.getStatus() == GetMessageStatus.NO_MATCHED_LOGIC_QUEUE) { if (TieredStoreUtil.isSystemTopic(topic) || PopAckConstants.isStartWithRevivePrefix(topic)) { return next.getMessage(group, topic, queueId, offset, maxMsgNums, messageFilter); @@ -198,7 +200,7 @@ public class TieredMessageStore extends AbstractPluginMessageStore { TieredStoreMetricsManager.messagesOutTotal.add(result.getMessageCount(), messagesOutAttributes); } - // fix min or max offset according next store + // Fix min or max offset according next store at last long minOffsetInQueue = next.getMinOffsetInQueue(topic, queueId); if (minOffsetInQueue >= 0 && minOffsetInQueue < result.getMinOffset()) { result.setMinOffset(minOffsetInQueue); @@ -209,7 +211,7 @@ public class TieredMessageStore extends AbstractPluginMessageStore { } return result; }).exceptionally(e -> { - logger.error("GetMessageAsync from tiered store failed: ", e); + logger.error("GetMessageAsync from tiered store failed", e); return next.getMessage(group, topic, queueId, offset, maxMsgNums, messageFilter); }); } @@ -251,7 +253,7 @@ public class TieredMessageStore extends AbstractPluginMessageStore { .build(); TieredStoreMetricsManager.apiLatency.record(stopwatch.elapsed(TimeUnit.MILLISECONDS), latencyAttributes); if (time < 0) { - logger.debug("TieredMessageStore#getEarliestMessageTimeAsync: get earliest message time failed, try to get earliest message time from next store: topic: {}, queue: {}", + logger.debug("GetEarliestMessageTimeAsync failed, try to get earliest message time from next store: topic: {}, queue: {}", topic, queueId); return finalNextEarliestMessageTime != Long.MAX_VALUE ? finalNextEarliestMessageTime : -1; } @@ -262,7 +264,7 @@ public class TieredMessageStore extends AbstractPluginMessageStore { @Override public CompletableFuture<Long> getMessageStoreTimeStampAsync(String topic, int queueId, long consumeQueueOffset) { - if (viaTieredStorage(topic, queueId, consumeQueueOffset)) { + if (fetchFromCurrentStore(topic, queueId, consumeQueueOffset)) { Stopwatch stopwatch = Stopwatch.createStarted(); return fetcher.getMessageStoreTimeStampAsync(topic, queueId, consumeQueueOffset) .thenApply(time -> { @@ -272,7 +274,7 @@ public class TieredMessageStore extends AbstractPluginMessageStore { .build(); TieredStoreMetricsManager.apiLatency.record(stopwatch.elapsed(TimeUnit.MILLISECONDS), latencyAttributes); if (time == -1) { - logger.debug("TieredMessageStore#getMessageStoreTimeStampAsync: get message time failed, try to get message time from next store: topic: {}, queue: {}, queue offset: {}", + logger.debug("GetEarliestMessageTimeAsync failed, try to get message time from next store, topic: {}, queue: {}, queue offset: {}", topic, queueId, consumeQueueOffset); return next.getMessageStoreTimeStamp(topic, queueId, consumeQueueOffset); } diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegment.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegment.java index 5062c7d9e7..32911a6e89 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegment.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegment.java @@ -16,14 +16,11 @@ */ package org.apache.rocketmq.tieredstore.provider; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Stopwatch; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Semaphore; -import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; @@ -35,8 +32,8 @@ import org.apache.rocketmq.tieredstore.exception.TieredStoreException; import org.apache.rocketmq.tieredstore.file.TieredCommitLog; import org.apache.rocketmq.tieredstore.file.TieredConsumeQueue; import org.apache.rocketmq.tieredstore.file.TieredIndexFile; -import org.apache.rocketmq.tieredstore.provider.inputstream.TieredFileSegmentInputStream; -import org.apache.rocketmq.tieredstore.provider.inputstream.TieredFileSegmentInputStreamFactory; +import org.apache.rocketmq.tieredstore.provider.stream.FileSegmentInputStream; +import org.apache.rocketmq.tieredstore.provider.stream.FileSegmentInputStreamFactory; import org.apache.rocketmq.tieredstore.util.MessageBufferUtil; import org.apache.rocketmq.tieredstore.util.TieredStoreUtil; @@ -50,22 +47,23 @@ public abstract class TieredFileSegment implements Comparable<TieredFileSegment> protected final TieredMessageStoreConfig storeConfig; private final long maxSize; - private final ReentrantLock bufferLock; - private final Semaphore commitLock; + private final ReentrantLock bufferLock = new ReentrantLock(); + private final Semaphore commitLock = new Semaphore(1); - private volatile boolean full; - private volatile boolean closed; + private volatile boolean full = false; + private volatile boolean closed = false; - private volatile long minTimestamp; - private volatile long maxTimestamp; - private volatile long commitPosition; - private volatile long appendPosition; + private volatile long minTimestamp = Long.MAX_VALUE; + private volatile long maxTimestamp = Long.MAX_VALUE; + private volatile long commitPosition = 0L; + private volatile long appendPosition = 0L; // only used in commitLog - private volatile long dispatchCommitOffset = 0; + private volatile long dispatchCommitOffset = 0L; private ByteBuffer codaBuffer; - private List<ByteBuffer> uploadBufferList = new ArrayList<>(); + private List<ByteBuffer> bufferList = new ArrayList<>(); + private FileSegmentInputStream fileSegmentInputStream; private CompletableFuture<Boolean> flightCommitRequest = CompletableFuture.completedFuture(false); public TieredFileSegment(TieredMessageStoreConfig storeConfig, @@ -75,21 +73,13 @@ public abstract class TieredFileSegment implements Comparable<TieredFileSegment> this.fileType = fileType; this.filePath = filePath; this.baseOffset = baseOffset; - - this.closed = false; - this.bufferLock = new ReentrantLock(); - this.commitLock = new Semaphore(1); - - this.commitPosition = 0L; - this.appendPosition = 0L; - this.minTimestamp = Long.MAX_VALUE; - this.maxTimestamp = Long.MAX_VALUE; - - // The max segment size of a file is determined by the file type - this.maxSize = getMaxSizeAccordingFileType(storeConfig); + this.maxSize = getMaxSizeByFileType(); } - private long getMaxSizeAccordingFileType(TieredMessageStoreConfig storeConfig) { + /** + * The max segment size of a file is determined by the file type + */ + protected long getMaxSizeByFileType() { switch (fileType) { case COMMIT_LOG: return storeConfig.getTieredStoreCommitLogMaxSize(); @@ -184,39 +174,23 @@ public abstract class TieredFileSegment implements Comparable<TieredFileSegment> this.appendPosition = pos; } - private List<ByteBuffer> rollingUploadBuffer() { + private List<ByteBuffer> borrowBuffer() { bufferLock.lock(); try { - List<ByteBuffer> tmp = uploadBufferList; - uploadBufferList = new ArrayList<>(); + List<ByteBuffer> tmp = bufferList; + bufferList = new ArrayList<>(); return tmp; } finally { bufferLock.unlock(); } } - private void sendBackBuffer(TieredFileSegmentInputStream inputStream) { - bufferLock.lock(); - try { - List<ByteBuffer> tmpBufferList = inputStream.getUploadBufferList(); - for (ByteBuffer buffer : tmpBufferList) { - buffer.rewind(); - } - tmpBufferList.addAll(uploadBufferList); - uploadBufferList = tmpBufferList; - if (inputStream.getCodaBuffer() != null) { - codaBuffer.rewind(); - } - } finally { - bufferLock.unlock(); - } - } - @SuppressWarnings("NonAtomicOperationOnVolatileField") - public AppendResult append(ByteBuffer byteBuf, long timeStamp) { + public AppendResult append(ByteBuffer byteBuf, long timestamp) { if (closed) { return AppendResult.FILE_CLOSED; } + bufferLock.lock(); try { if (full || codaBuffer != null) { @@ -227,7 +201,8 @@ public abstract class TieredFileSegment implements Comparable<TieredFileSegment> minTimestamp = byteBuf.getLong(TieredIndexFile.INDEX_FILE_HEADER_BEGIN_TIME_STAMP_POSITION); maxTimestamp = byteBuf.getLong(TieredIndexFile.INDEX_FILE_HEADER_END_TIME_STAMP_POSITION); appendPosition += byteBuf.remaining(); - uploadBufferList.add(byteBuf); + // IndexFile is large and not change after compaction, no need deep copy + bufferList.add(byteBuf); setFull(); return AppendResult.SUCCESS; } @@ -236,23 +211,34 @@ public abstract class TieredFileSegment implements Comparable<TieredFileSegment> setFull(); return AppendResult.FILE_FULL; } - if (uploadBufferList.size() > storeConfig.getTieredStoreGroupCommitCount() + + if (bufferList.size() > storeConfig.getTieredStoreGroupCommitCount() || appendPosition - commitPosition > storeConfig.getTieredStoreGroupCommitSize()) { commitAsync(); } - if (uploadBufferList.size() > storeConfig.getTieredStoreMaxGroupCommitCount()) { - logger.debug("TieredFileSegment#append: buffer full: file: {}, upload buffer size: {}", - getPath(), uploadBufferList.size()); + + if (bufferList.size() > storeConfig.getTieredStoreMaxGroupCommitCount()) { + logger.debug("File segment append buffer full, file: {}, buffer size: {}, pending bytes: {}", + getPath(), bufferList.size(), appendPosition - commitPosition); return AppendResult.BUFFER_FULL; } - if (timeStamp != Long.MAX_VALUE) { - maxTimestamp = timeStamp; + + if (timestamp != Long.MAX_VALUE) { + maxTimestamp = timestamp; if (minTimestamp == Long.MAX_VALUE) { - minTimestamp = timeStamp; + minTimestamp = timestamp; } } + appendPosition += byteBuf.remaining(); - uploadBufferList.add(byteBuf); + + // deep copy buffer + ByteBuffer byteBuffer = ByteBuffer.allocateDirect(byteBuf.remaining()); + byteBuffer.put(byteBuf); + byteBuffer.flip(); + byteBuf.rewind(); + + bufferList.add(byteBuffer); return AppendResult.SUCCESS; } finally { bufferLock.unlock(); @@ -267,7 +253,6 @@ public abstract class TieredFileSegment implements Comparable<TieredFileSegment> return appendPosition; } - @VisibleForTesting public void setAppendPosition(long appendPosition) { this.appendPosition = appendPosition; } @@ -333,6 +318,8 @@ public abstract class TieredFileSegment implements Comparable<TieredFileSegment> if (closed) { return false; } + // result is false when we send real commit request + // use join for wait flight request done Boolean result = commitAsync().join(); if (!result) { result = flightCommitRequest.join(); @@ -340,92 +327,156 @@ public abstract class TieredFileSegment implements Comparable<TieredFileSegment> return result; } + private void releaseCommitLock() { + if (commitLock.availablePermits() == 0) { + commitLock.release(); + } else { + logger.error("[Bug] FileSegmentCommitAsync, lock is already released: available permits: {}", + commitLock.availablePermits()); + } + } + + private void updateDispatchCommitOffset(List<ByteBuffer> bufferList) { + if (fileType == FileSegmentType.COMMIT_LOG && bufferList.size() > 0) { + dispatchCommitOffset = + MessageBufferUtil.getQueueOffset(bufferList.get(bufferList.size() - 1)); + } + } + + /** + * @return false: commit, true: no commit operation + */ @SuppressWarnings("NonAtomicOperationOnVolatileField") public CompletableFuture<Boolean> commitAsync() { if (closed) { return CompletableFuture.completedFuture(false); } - Stopwatch stopwatch = Stopwatch.createStarted(); + if (!needCommit()) { return CompletableFuture.completedFuture(true); } - try { - int permits = commitLock.drainPermits(); - if (permits <= 0) { - return CompletableFuture.completedFuture(false); - } - } catch (Exception e) { + + if (commitLock.drainPermits() <= 0) { return CompletableFuture.completedFuture(false); } - List<ByteBuffer> bufferList = rollingUploadBuffer(); - int bufferSize = 0; - for (ByteBuffer buffer : bufferList) { - bufferSize += buffer.remaining(); - } - if (codaBuffer != null) { - bufferSize += codaBuffer.remaining(); - } - if (bufferSize == 0) { - return CompletableFuture.completedFuture(true); - } - TieredFileSegmentInputStream inputStream = TieredFileSegmentInputStreamFactory.build( - fileType, baseOffset + commitPosition, bufferList, codaBuffer, bufferSize); - int finalBufferSize = bufferSize; + try { - flightCommitRequest = commit0(inputStream, commitPosition, bufferSize, fileType != FileSegmentType.INDEX) + if (fileSegmentInputStream != null) { + long fileSize = this.getSize(); + if (fileSize == -1L) { + logger.error("Get commit position error before commit, Commit: %d, Expect: %d, Current Max: %d, FileName: %s", + commitPosition, commitPosition + fileSegmentInputStream.getContentLength(), appendPosition, getPath()); + releaseCommitLock(); + return CompletableFuture.completedFuture(false); + } else { + if (correctPosition(fileSize, null)) { + updateDispatchCommitOffset(fileSegmentInputStream.getBufferList()); + fileSegmentInputStream = null; + } + } + } + + int bufferSize; + if (fileSegmentInputStream != null) { + bufferSize = fileSegmentInputStream.available(); + } else { + List<ByteBuffer> bufferList = borrowBuffer(); + bufferSize = bufferList.stream().mapToInt(ByteBuffer::remaining).sum() + + (codaBuffer != null ? codaBuffer.remaining() : 0); + if (bufferSize == 0) { + releaseCommitLock(); + return CompletableFuture.completedFuture(true); + } + fileSegmentInputStream = FileSegmentInputStreamFactory.build( + fileType, baseOffset + commitPosition, bufferList, codaBuffer, bufferSize); + } + + return flightCommitRequest = this + .commit0(fileSegmentInputStream, commitPosition, bufferSize, fileType != FileSegmentType.INDEX) .thenApply(result -> { if (result) { - if (fileType == FileSegmentType.COMMIT_LOG && bufferList.size() > 0) { - dispatchCommitOffset = MessageBufferUtil.getQueueOffset(bufferList.get(bufferList.size() - 1)); - } - commitPosition += finalBufferSize; + updateDispatchCommitOffset(fileSegmentInputStream.getBufferList()); + commitPosition += bufferSize; + fileSegmentInputStream = null; return true; - } - sendBackBuffer(inputStream); - return false; - }) - .exceptionally(e -> handleCommitException(inputStream, e)) - .whenComplete((result, e) -> { - if (commitLock.availablePermits() == 0) { - logger.debug("TieredFileSegment#commitAsync: commit cost: {}ms, file: {}, item count: {}, buffer size: {}", stopwatch.elapsed(TimeUnit.MILLISECONDS), getPath(), bufferList.size(), finalBufferSize); - commitLock.release(); } else { - logger.error("[Bug]TieredFileSegment#commitAsync: commit lock is already released: available permits: {}", commitLock.availablePermits()); + fileSegmentInputStream.rewind(); + return false; } - }); - return flightCommitRequest; + }) + .exceptionally(this::handleCommitException) + .whenComplete((result, e) -> releaseCommitLock()); + } catch (Exception e) { - handleCommitException(inputStream, e); - if (commitLock.availablePermits() == 0) { - logger.debug("TieredFileSegment#commitAsync: commit cost: {}ms, file: {}, item count: {}, buffer size: {}", stopwatch.elapsed(TimeUnit.MILLISECONDS), getPath(), bufferList.size(), finalBufferSize); - commitLock.release(); - } else { - logger.error("[Bug]TieredFileSegment#commitAsync: commit lock is already released: available permits: {}", commitLock.availablePermits()); - } + handleCommitException(e); + releaseCommitLock(); } return CompletableFuture.completedFuture(false); } - private boolean handleCommitException(TieredFileSegmentInputStream inputStream, Throwable e) { + private long getCorrectFileSize(Throwable throwable) { + if (throwable instanceof TieredStoreException) { + long fileSize = ((TieredStoreException) throwable).getPosition(); + if (fileSize > 0) { + return fileSize; + } + } + return getSize(); + } + + private boolean handleCommitException(Throwable e) { + // Get root cause here Throwable cause = e.getCause() != null ? e.getCause() : e; - sendBackBuffer(inputStream); - long realSize = 0; - if (cause instanceof TieredStoreException && ((TieredStoreException) cause).getPosition() > 0) { - realSize = ((TieredStoreException) cause).getPosition(); + long fileSize = this.getCorrectFileSize(cause); + + if (fileSize == -1L) { + logger.error("Get commit position error, Commit: %d, Expect: %d, Current Max: %d, FileName: %s", + commitPosition, commitPosition + fileSegmentInputStream.getContentLength(), appendPosition, getPath()); + fileSegmentInputStream.rewind(); + return false; + } + + if (correctPosition(fileSize, cause)) { + updateDispatchCommitOffset(fileSegmentInputStream.getBufferList()); + fileSegmentInputStream = null; + return true; + } else { + fileSegmentInputStream.rewind(); + return false; } - if (realSize <= 0) { - realSize = getSize(); + } + + /** + * return true to clear buffer + */ + private boolean correctPosition(long fileSize, Throwable throwable) { + + // Current we have three offsets here: commit offset, expect offset, file size. + // We guarantee that the commit offset is less than or equal to the expect offset. + // Max offset will increase because we can continuously put in new buffers + String handleInfo = throwable == null ? "before commit" : "after commit"; + long expectPosition = commitPosition + fileSegmentInputStream.getContentLength(); + + String offsetInfo = String.format("Correct Commit Position, %s, result=[{}], " + + "Commit: %d, Expect: %d, Current Max: %d, FileSize: %d, FileName: %s", + handleInfo, commitPosition, expectPosition, appendPosition, fileSize, this.getPath()); + + // We are believing that the file size returned by the server is correct, + // can reset the commit offset to the file size reported by the storage system. + if (fileSize == expectPosition) { + logger.info(offsetInfo, "Success", throwable); + commitPosition = fileSize; + return true; } - if (realSize > 0 && realSize > commitPosition) { - logger.error("TieredFileSegment#handleCommitException: commit failed: file: {}, try to fix position: origin: {}, real: {}", getPath(), commitPosition, realSize, cause); - // TODO check if this diff part is uploaded to backend storage - long diff = appendPosition - commitPosition; - commitPosition = realSize; - appendPosition = realSize + diff; - // TODO check if appendPosition is large than maxOffset - } else if (realSize < commitPosition) { - logger.error("[Bug]TieredFileSegment#handleCommitException: commit failed: file: {}, can not fix position: origin: {}, real: {}", getPath(), commitPosition, realSize, cause); + + if (fileSize < commitPosition) { + logger.error(offsetInfo, "FileSizeIncorrect", throwable); + } else if (fileSize == commitPosition) { + logger.warn(offsetInfo, "CommitFailed", throwable); + } else if (fileSize > commitPosition) { + logger.warn(offsetInfo, "PartialSuccess", throwable); } + commitPosition = fileSize; return false; } } diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreProvider.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreProvider.java index 5a0ca25f59..0db3eaf8f4 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreProvider.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreProvider.java @@ -18,7 +18,7 @@ package org.apache.rocketmq.tieredstore.provider; import java.nio.ByteBuffer; import java.util.concurrent.CompletableFuture; -import org.apache.rocketmq.tieredstore.provider.inputstream.TieredFileSegmentInputStream; +import org.apache.rocketmq.tieredstore.provider.stream.FileSegmentInputStream; public interface TieredStoreProvider { @@ -30,7 +30,9 @@ public interface TieredStoreProvider { String getPath(); /** - * Get file size in backend file system + * Get the real length of the file. + * Return 0 if the file does not exist, + * Return -1 if system get size failed. * * @return file real size */ @@ -71,5 +73,5 @@ public interface TieredStoreProvider { * @param append try to append or create a new file * @return put result, <code>true</code> if data successfully write; <code>false</code> otherwise */ - CompletableFuture<Boolean> commit0(TieredFileSegmentInputStream inputStream,long position, int length, boolean append); + CompletableFuture<Boolean> commit0(FileSegmentInputStream inputStream,long position, int length, boolean append); } diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java index 52be90b1df..7e949cb28c 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java @@ -36,7 +36,7 @@ import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig; import org.apache.rocketmq.tieredstore.common.TieredStoreExecutor; import org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsManager; import org.apache.rocketmq.tieredstore.provider.TieredFileSegment; -import org.apache.rocketmq.tieredstore.provider.inputstream.TieredFileSegmentInputStream; +import org.apache.rocketmq.tieredstore.provider.stream.FileSegmentInputStream; import org.apache.rocketmq.tieredstore.util.TieredStoreUtil; import static org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsConstant.LABEL_FILE_TYPE; @@ -184,7 +184,7 @@ public class PosixFileSegment extends TieredFileSegment { @Override public CompletableFuture<Boolean> commit0( - TieredFileSegmentInputStream inputStream, long position, int length, boolean append) { + FileSegmentInputStream inputStream, long position, int length, boolean append) { Stopwatch stopwatch = Stopwatch.createStarted(); AttributesBuilder attributesBuilder = newAttributesBuilder() diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/inputstream/TieredCommitLogInputStream.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/stream/CommitLogInputStream.java similarity index 88% rename from tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/inputstream/TieredCommitLogInputStream.java rename to tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/stream/CommitLogInputStream.java index c70bb76562..13b6e0ef9c 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/inputstream/TieredCommitLogInputStream.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/stream/CommitLogInputStream.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.rocketmq.tieredstore.provider.inputstream; +package org.apache.rocketmq.tieredstore.provider.stream; import java.io.IOException; import java.nio.ByteBuffer; @@ -23,20 +23,23 @@ import java.util.List; import org.apache.rocketmq.tieredstore.common.FileSegmentType; import org.apache.rocketmq.tieredstore.util.MessageBufferUtil; -public class TieredCommitLogInputStream extends TieredFileSegmentInputStream { +public class CommitLogInputStream extends FileSegmentInputStream { /** * commitLogOffset is the real physical offset of the commitLog buffer which is being read */ + private final long startCommitLogOffset; + private long commitLogOffset; private final ByteBuffer codaBuffer; private long markCommitLogOffset = -1; - public TieredCommitLogInputStream(FileSegmentType fileType, long startOffset, + public CommitLogInputStream(FileSegmentType fileType, long startOffset, List<ByteBuffer> uploadBufferList, ByteBuffer codaBuffer, int contentLength) { super(fileType, uploadBufferList, contentLength); + this.startCommitLogOffset = startOffset; this.commitLogOffset = startOffset; this.codaBuffer = codaBuffer; } @@ -53,6 +56,15 @@ public class TieredCommitLogInputStream extends TieredFileSegmentInputStream { this.commitLogOffset = markCommitLogOffset; } + @Override + public synchronized void rewind() { + super.rewind(); + this.commitLogOffset = this.startCommitLogOffset; + if (this.codaBuffer != null) { + this.codaBuffer.rewind(); + } + } + @Override public ByteBuffer getCodaBuffer() { return this.codaBuffer; @@ -64,17 +76,17 @@ public class TieredCommitLogInputStream extends TieredFileSegmentInputStream { return -1; } readPosition++; - if (curReadBufferIndex >= uploadBufferList.size()) { + if (curReadBufferIndex >= bufferList.size()) { return readCoda(); } int res; if (readPosInCurBuffer >= curBuffer.remaining()) { curReadBufferIndex++; - if (curReadBufferIndex >= uploadBufferList.size()) { + if (curReadBufferIndex >= bufferList.size()) { readPosInCurBuffer = 0; return readCoda(); } - curBuffer = uploadBufferList.get(curReadBufferIndex); + curBuffer = bufferList.get(curReadBufferIndex); commitLogOffset += readPosInCurBuffer; readPosInCurBuffer = 0; } @@ -119,9 +131,9 @@ public class TieredCommitLogInputStream extends TieredFileSegmentInputStream { int posInCurBuffer = readPosInCurBuffer; long curCommitLogOffset = commitLogOffset; ByteBuffer curBuf = curBuffer; - while (needRead > 0 && bufIndex <= uploadBufferList.size()) { + while (needRead > 0 && bufIndex <= bufferList.size()) { int readLen, remaining, realReadLen = 0; - if (bufIndex == uploadBufferList.size()) { + if (bufIndex == bufferList.size()) { // read from coda buffer remaining = codaBuffer.remaining() - posInCurBuffer; readLen = Math.min(remaining, needRead); @@ -137,7 +149,7 @@ public class TieredCommitLogInputStream extends TieredFileSegmentInputStream { } remaining = curBuf.remaining() - posInCurBuffer; readLen = Math.min(remaining, needRead); - curBuf = uploadBufferList.get(bufIndex); + curBuf = bufferList.get(bufIndex); if (posInCurBuffer < MessageBufferUtil.PHYSICAL_OFFSET_POSITION) { realReadLen = Math.min(MessageBufferUtil.PHYSICAL_OFFSET_POSITION - posInCurBuffer, readLen); // read from commitLog buffer diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/inputstream/TieredFileSegmentInputStream.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/stream/FileSegmentInputStream.java similarity index 77% rename from tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/inputstream/TieredFileSegmentInputStream.java rename to tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/stream/FileSegmentInputStream.java index e1758ca934..9e9d5135cd 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/inputstream/TieredFileSegmentInputStream.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/stream/FileSegmentInputStream.java @@ -15,15 +15,16 @@ * limitations under the License. */ -package org.apache.rocketmq.tieredstore.provider.inputstream; +package org.apache.rocketmq.tieredstore.provider.stream; import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; import java.util.List; +import org.apache.commons.collections.CollectionUtils; import org.apache.rocketmq.tieredstore.common.FileSegmentType; -public class TieredFileSegmentInputStream extends InputStream { +public class FileSegmentInputStream extends InputStream { /** * file type, can be commitlog, consume queue or indexfile now @@ -33,7 +34,7 @@ public class TieredFileSegmentInputStream extends InputStream { /** * hold bytebuffer */ - protected final List<ByteBuffer> uploadBufferList; + protected final List<ByteBuffer> bufferList; /** * total remaining of bytebuffer list @@ -65,13 +66,13 @@ public class TieredFileSegmentInputStream extends InputStream { private int markReadPosInCurBuffer = -1; - public TieredFileSegmentInputStream(FileSegmentType fileType, List<ByteBuffer> uploadBufferList, - int contentLength) { + public FileSegmentInputStream( + FileSegmentType fileType, List<ByteBuffer> bufferList, int contentLength) { this.fileType = fileType; this.contentLength = contentLength; - this.uploadBufferList = uploadBufferList; - if (uploadBufferList != null && uploadBufferList.size() > 0) { - this.curBuffer = uploadBufferList.get(curReadBufferIndex); + this.bufferList = bufferList; + if (bufferList != null && bufferList.size() > 0) { + this.curBuffer = bufferList.get(curReadBufferIndex); } } @@ -95,18 +96,34 @@ public class TieredFileSegmentInputStream extends InputStream { this.readPosition = markReadPosition; this.curReadBufferIndex = markCurReadBufferIndex; this.readPosInCurBuffer = markReadPosInCurBuffer; - if (this.curReadBufferIndex < uploadBufferList.size()) { - this.curBuffer = uploadBufferList.get(curReadBufferIndex); + if (this.curReadBufferIndex < bufferList.size()) { + this.curBuffer = bufferList.get(curReadBufferIndex); } } + public synchronized void rewind() { + this.readPosition = 0; + this.curReadBufferIndex = 0; + this.readPosInCurBuffer = 0; + if (CollectionUtils.isNotEmpty(bufferList)) { + this.curBuffer = bufferList.get(0); + for (ByteBuffer buffer : bufferList) { + buffer.rewind(); + } + } + } + + public int getContentLength() { + return contentLength; + } + @Override public int available() { return contentLength - readPosition; } - public List<ByteBuffer> getUploadBufferList() { - return uploadBufferList; + public List<ByteBuffer> getBufferList() { + return bufferList; } public ByteBuffer getCodaBuffer() { @@ -121,10 +138,10 @@ public class TieredFileSegmentInputStream extends InputStream { readPosition++; if (readPosInCurBuffer >= curBuffer.remaining()) { curReadBufferIndex++; - if (curReadBufferIndex >= uploadBufferList.size()) { + if (curReadBufferIndex >= bufferList.size()) { return -1; } - curBuffer = uploadBufferList.get(curReadBufferIndex); + curBuffer = bufferList.get(curReadBufferIndex); readPosInCurBuffer = 0; } return curBuffer.get(readPosInCurBuffer++) & 0xff; @@ -153,8 +170,8 @@ public class TieredFileSegmentInputStream extends InputStream { int bufIndex = curReadBufferIndex; int posInCurBuffer = readPosInCurBuffer; ByteBuffer curBuf = curBuffer; - while (needRead > 0 && bufIndex < uploadBufferList.size()) { - curBuf = uploadBufferList.get(bufIndex); + while (needRead > 0 && bufIndex < bufferList.size()) { + curBuf = bufferList.get(bufIndex); int remaining = curBuf.remaining() - posInCurBuffer; int readLen = Math.min(remaining, needRead); // read from curBuf diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/inputstream/TieredFileSegmentInputStreamFactory.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/stream/FileSegmentInputStreamFactory.java similarity index 54% rename from tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/inputstream/TieredFileSegmentInputStreamFactory.java rename to tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/stream/FileSegmentInputStreamFactory.java index d0c983fd43..a90baff3ae 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/inputstream/TieredFileSegmentInputStreamFactory.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/stream/FileSegmentInputStreamFactory.java @@ -15,30 +15,34 @@ * limitations under the License. */ -package org.apache.rocketmq.tieredstore.provider.inputstream; +package org.apache.rocketmq.tieredstore.provider.stream; import java.nio.ByteBuffer; import java.util.List; import org.apache.rocketmq.tieredstore.common.FileSegmentType; -public class TieredFileSegmentInputStreamFactory { +public class FileSegmentInputStreamFactory { - public static TieredFileSegmentInputStream build(FileSegmentType fileType, - long startOffset, List<ByteBuffer> uploadBufferList, ByteBuffer codaBuffer, int contentLength) { + public static FileSegmentInputStream build( + FileSegmentType fileType, long offset, List<ByteBuffer> bufferList, ByteBuffer byteBuffer, int length) { + + if (bufferList == null) { + throw new IllegalArgumentException("bufferList is null"); + } switch (fileType) { case COMMIT_LOG: - return new TieredCommitLogInputStream( - fileType, startOffset, uploadBufferList, codaBuffer, contentLength); + return new CommitLogInputStream( + fileType, offset, bufferList, byteBuffer, length); case CONSUME_QUEUE: - return new TieredFileSegmentInputStream(fileType, uploadBufferList, contentLength); + return new FileSegmentInputStream(fileType, bufferList, length); case INDEX: - if (uploadBufferList.size() != 1) { - throw new IllegalArgumentException("uploadBufferList size in INDEX type input stream must be 1"); + if (bufferList.size() != 1) { + throw new IllegalArgumentException("buffer block size must be 1 when file type is IndexFile"); } - return new TieredFileSegmentInputStream(fileType, uploadBufferList, contentLength); + return new FileSegmentInputStream(fileType, bufferList, length); default: - throw new IllegalArgumentException("fileType is not supported"); + throw new IllegalArgumentException("file type is not supported"); } } } diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java index 8601392e74..2451199c28 100644 --- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java +++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java @@ -130,36 +130,36 @@ public class TieredMessageStoreTest { // TieredStorageLevel.DISABLE properties.setProperty("tieredStorageLevel", "0"); configuration.update(properties); - Assert.assertFalse(store.viaTieredStorage(mq.getTopic(), mq.getQueueId(), 0)); + Assert.assertFalse(store.fetchFromCurrentStore(mq.getTopic(), mq.getQueueId(), 0)); // TieredStorageLevel.NOT_IN_DISK properties.setProperty("tieredStorageLevel", "1"); configuration.update(properties); when(nextStore.checkInStoreByConsumeOffset(anyString(), anyInt(), anyLong())).thenReturn(false); - Assert.assertTrue(store.viaTieredStorage(mq.getTopic(), mq.getQueueId(), 0)); + Assert.assertTrue(store.fetchFromCurrentStore(mq.getTopic(), mq.getQueueId(), 0)); when(nextStore.checkInStoreByConsumeOffset(anyString(), anyInt(), anyLong())).thenReturn(true); - Assert.assertFalse(store.viaTieredStorage(mq.getTopic(), mq.getQueueId(), 0)); + Assert.assertFalse(store.fetchFromCurrentStore(mq.getTopic(), mq.getQueueId(), 0)); // TieredStorageLevel.NOT_IN_MEM properties.setProperty("tieredStorageLevel", "2"); configuration.update(properties); Mockito.when(nextStore.checkInStoreByConsumeOffset(anyString(), anyInt(), anyLong())).thenReturn(false); Mockito.when(nextStore.checkInMemByConsumeOffset(anyString(), anyInt(), anyLong(), anyInt())).thenReturn(true); - Assert.assertTrue(store.viaTieredStorage(mq.getTopic(), mq.getQueueId(), 0)); + Assert.assertTrue(store.fetchFromCurrentStore(mq.getTopic(), mq.getQueueId(), 0)); Mockito.when(nextStore.checkInStoreByConsumeOffset(anyString(), anyInt(), anyLong())).thenReturn(true); Mockito.when(nextStore.checkInMemByConsumeOffset(anyString(), anyInt(), anyLong(), anyInt())).thenReturn(false); - Assert.assertTrue(store.viaTieredStorage(mq.getTopic(), mq.getQueueId(), 0)); + Assert.assertTrue(store.fetchFromCurrentStore(mq.getTopic(), mq.getQueueId(), 0)); Mockito.when(nextStore.checkInStoreByConsumeOffset(anyString(), anyInt(), anyLong())).thenReturn(true); Mockito.when(nextStore.checkInMemByConsumeOffset(anyString(), anyInt(), anyLong(), anyInt())).thenReturn(true); - Assert.assertFalse(store.viaTieredStorage(mq.getTopic(), mq.getQueueId(), 0)); + Assert.assertFalse(store.fetchFromCurrentStore(mq.getTopic(), mq.getQueueId(), 0)); // TieredStorageLevel.FORCE properties.setProperty("tieredStorageLevel", "3"); configuration.update(properties); - Assert.assertTrue(store.viaTieredStorage(mq.getTopic(), mq.getQueueId(), 0)); + Assert.assertTrue(store.fetchFromCurrentStore(mq.getTopic(), mq.getQueueId(), 0)); } @Test diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileTest.java index cc39cfbfce..7a4d059690 100644 --- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileTest.java +++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileTest.java @@ -24,6 +24,7 @@ import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.tieredstore.TieredStoreTestUtil; import org.apache.rocketmq.tieredstore.common.FileSegmentType; import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig; +import org.apache.rocketmq.tieredstore.common.TieredStoreExecutor; import org.apache.rocketmq.tieredstore.metadata.FileSegmentMetadata; import org.apache.rocketmq.tieredstore.metadata.TieredMetadataStore; import org.apache.rocketmq.tieredstore.provider.TieredFileSegment; @@ -55,6 +56,7 @@ public class TieredFlatFileTest { public void tearDown() throws IOException { TieredStoreTestUtil.destroyMetadataStore(); TieredStoreTestUtil.destroyTempDir(storePath); + TieredStoreExecutor.shutdown(); } private List<FileSegmentMetadata> getSegmentMetadataList(TieredMetadataStore metadataStore) { diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredIndexFileTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredIndexFileTest.java index 262d6645b3..2da72bc7a7 100644 --- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredIndexFileTest.java +++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/file/TieredIndexFileTest.java @@ -87,5 +87,7 @@ public class TieredIndexFileTest { indexList = indexFile.queryAsync(mq.getTopic(), "key1", 1000, 1200).join(); Assert.assertEquals(1, indexList.size()); + + indexFile.destroy(); } } diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/MockTieredFileSegmentInputStream.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/MockFileSegmentInputStream.java similarity index 82% rename from tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/MockTieredFileSegmentInputStream.java rename to tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/MockFileSegmentInputStream.java index a6566b7de5..3bbe41dd4b 100644 --- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/MockTieredFileSegmentInputStream.java +++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/MockFileSegmentInputStream.java @@ -20,13 +20,13 @@ package org.apache.rocketmq.tieredstore.provider; import java.io.InputStream; import java.nio.ByteBuffer; import java.util.List; -import org.apache.rocketmq.tieredstore.provider.inputstream.TieredFileSegmentInputStream; +import org.apache.rocketmq.tieredstore.provider.stream.FileSegmentInputStream; -public class MockTieredFileSegmentInputStream extends TieredFileSegmentInputStream { +public class MockFileSegmentInputStream extends FileSegmentInputStream { private final InputStream inputStream; - public MockTieredFileSegmentInputStream(InputStream inputStream) { + public MockFileSegmentInputStream(InputStream inputStream) { super(null, null, Integer.MAX_VALUE); this.inputStream = inputStream; } @@ -43,7 +43,7 @@ public class MockTieredFileSegmentInputStream extends TieredFileSegmentInputStre } @Override - public List<ByteBuffer> getUploadBufferList() { + public List<ByteBuffer> getBufferList() { return null; } diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegmentInputStreamTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegmentInputStreamTest.java index a2554ba3d1..743d9182ce 100644 --- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegmentInputStreamTest.java +++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegmentInputStreamTest.java @@ -28,8 +28,8 @@ import java.util.Random; import org.apache.rocketmq.tieredstore.common.FileSegmentType; import org.apache.rocketmq.tieredstore.file.TieredCommitLog; import org.apache.rocketmq.tieredstore.file.TieredConsumeQueue; -import org.apache.rocketmq.tieredstore.provider.inputstream.TieredFileSegmentInputStream; -import org.apache.rocketmq.tieredstore.provider.inputstream.TieredFileSegmentInputStreamFactory; +import org.apache.rocketmq.tieredstore.provider.stream.FileSegmentInputStream; +import org.apache.rocketmq.tieredstore.provider.stream.FileSegmentInputStreamFactory; import org.apache.rocketmq.tieredstore.util.MessageBufferUtil; import org.apache.rocketmq.tieredstore.util.MessageBufferUtilTest; import org.junit.Assert; @@ -57,7 +57,7 @@ public class TieredFileSegmentInputStreamTest { bufferSize += byteBuffer.remaining(); } - // build expected byte buffer for verifying the TieredFileSegmentInputStream + // build expected byte buffer for verifying the FileSegmentInputStream ByteBuffer expectedByteBuffer = ByteBuffer.allocate(bufferSize); for (ByteBuffer byteBuffer : uploadBufferList) { expectedByteBuffer.put(byteBuffer); @@ -74,7 +74,7 @@ public class TieredFileSegmentInputStreamTest { int[] batchReadSizeTestSet = { MessageBufferUtil.PHYSICAL_OFFSET_POSITION - 1, MessageBufferUtil.PHYSICAL_OFFSET_POSITION, MessageBufferUtil.PHYSICAL_OFFSET_POSITION + 1, MSG_LEN - 1, MSG_LEN, MSG_LEN + 1 }; - verifyReadAndReset(expectedByteBuffer, () -> TieredFileSegmentInputStreamFactory.build( + verifyReadAndReset(expectedByteBuffer, () -> FileSegmentInputStreamFactory.build( FileSegmentType.COMMIT_LOG, COMMIT_LOG_START_OFFSET, uploadBufferList, null, finalBufferSize), finalBufferSize, batchReadSizeTestSet); } @@ -98,7 +98,7 @@ public class TieredFileSegmentInputStreamTest { int codaBufferSize = codaBuffer.remaining(); bufferSize += codaBufferSize; - // build expected byte buffer for verifying the TieredFileSegmentInputStream + // build expected byte buffer for verifying the FileSegmentInputStream ByteBuffer expectedByteBuffer = ByteBuffer.allocate(bufferSize); for (ByteBuffer byteBuffer : uploadBufferList) { expectedByteBuffer.put(byteBuffer); @@ -119,7 +119,7 @@ public class TieredFileSegmentInputStreamTest { MSG_LEN - 1, MSG_LEN, MSG_LEN + 1, bufferSize - 1, bufferSize, bufferSize + 1 }; - verifyReadAndReset(expectedByteBuffer, () -> TieredFileSegmentInputStreamFactory.build( + verifyReadAndReset(expectedByteBuffer, () -> FileSegmentInputStreamFactory.build( FileSegmentType.COMMIT_LOG, COMMIT_LOG_START_OFFSET, uploadBufferList, codaBuffer, finalBufferSize), finalBufferSize, batchReadSizeTestSet); } @@ -134,7 +134,7 @@ public class TieredFileSegmentInputStreamTest { bufferSize += byteBuffer.remaining(); } - // build expected byte buffer for verifying the TieredFileSegmentInputStream + // build expected byte buffer for verifying the FileSegmentInputStream ByteBuffer expectedByteBuffer = ByteBuffer.allocate(bufferSize); for (ByteBuffer byteBuffer : uploadBufferList) { expectedByteBuffer.put(byteBuffer); @@ -143,7 +143,7 @@ public class TieredFileSegmentInputStreamTest { int finalBufferSize = bufferSize; int[] batchReadSizeTestSet = {TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE - 1, TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE, TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE + 1}; - verifyReadAndReset(expectedByteBuffer, () -> TieredFileSegmentInputStreamFactory.build( + verifyReadAndReset(expectedByteBuffer, () -> FileSegmentInputStreamFactory.build( FileSegmentType.CONSUME_QUEUE, COMMIT_LOG_START_OFFSET, uploadBufferList, null, finalBufferSize), bufferSize, batchReadSizeTestSet); } @@ -156,16 +156,16 @@ public class TieredFileSegmentInputStreamTest { byteBuffer.flip(); List<ByteBuffer> uploadBufferList = Arrays.asList(byteBuffer); - // build expected byte buffer for verifying the TieredFileSegmentInputStream + // build expected byte buffer for verifying the FileSegmentInputStream ByteBuffer expectedByteBuffer = byteBuffer.slice(); - verifyReadAndReset(expectedByteBuffer, () -> TieredFileSegmentInputStreamFactory.build( + verifyReadAndReset(expectedByteBuffer, () -> FileSegmentInputStreamFactory.build( FileSegmentType.INDEX, COMMIT_LOG_START_OFFSET, uploadBufferList, null, byteBuffer.limit()), byteBuffer.limit(), new int[] {23, 24, 25}); } - private void verifyReadAndReset(ByteBuffer expectedByteBuffer, Supplier<TieredFileSegmentInputStream> constructor, + private void verifyReadAndReset(ByteBuffer expectedByteBuffer, Supplier<FileSegmentInputStream> constructor, int bufferSize, int[] readBatchSizeTestSet) { - TieredFileSegmentInputStream inputStream = constructor.get(); + FileSegmentInputStream inputStream = constructor.get(); // verify verifyInputStream(inputStream, expectedByteBuffer); diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegmentTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegmentTest.java index 4cd83e0d26..a655710a50 100644 --- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegmentTest.java +++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegmentTest.java @@ -116,13 +116,22 @@ public class TieredFileSegmentTest { } @Test - public void testCommitFailed() { + public void testCommitFailedThenSuccess() { long startTime = System.currentTimeMillis(); MemoryFileSegment segment = (MemoryFileSegment) createFileSegment(FileSegmentType.COMMIT_LOG); long lastSize = segment.getSize(); - segment.append(MessageBufferUtilTest.buildMockedMessageBuffer(), 0); - segment.append(MessageBufferUtilTest.buildMockedMessageBuffer(), 0); - + segment.setCheckSize(false); + segment.initPosition(lastSize); + segment.setSize((int) lastSize); + + ByteBuffer buffer1 = MessageBufferUtilTest.buildMockedMessageBuffer().putLong( + MessageBufferUtil.PHYSICAL_OFFSET_POSITION, baseOffset + lastSize); + ByteBuffer buffer2 = MessageBufferUtilTest.buildMockedMessageBuffer().putLong( + MessageBufferUtil.PHYSICAL_OFFSET_POSITION, baseOffset + lastSize + MessageBufferUtilTest.MSG_LEN); + segment.append(buffer1, 0); + segment.append(buffer2, 0); + + // Mock new message arrive segment.blocker = new CompletableFuture<>(); new Thread(() -> { try { @@ -131,20 +140,88 @@ public class TieredFileSegmentTest { Assert.fail(e.getMessage()); } ByteBuffer buffer = MessageBufferUtilTest.buildMockedMessageBuffer(); + buffer.putLong(MessageBufferUtil.PHYSICAL_OFFSET_POSITION, MessageBufferUtilTest.MSG_LEN * 2); buffer.putLong(MessageBufferUtil.STORE_TIMESTAMP_POSITION, startTime); segment.append(buffer, 0); segment.blocker.complete(false); }).start(); + // Commit failed segment.commit(); segment.blocker.join(); + segment.blocker = null; + + // Copy data and assume commit success + segment.getMemStore().put(buffer1); + segment.getMemStore().put(buffer2); + segment.setSize((int) (lastSize + MessageBufferUtilTest.MSG_LEN * 2)); - segment.blocker = new CompletableFuture<>(); - segment.blocker.complete(true); segment.commit(); + Assert.assertEquals(lastSize + MessageBufferUtilTest.MSG_LEN * 3, segment.getCommitPosition()); + Assert.assertEquals(baseOffset + lastSize + MessageBufferUtilTest.MSG_LEN * 3, segment.getCommitOffset()); + Assert.assertEquals(baseOffset + lastSize + MessageBufferUtilTest.MSG_LEN * 3, segment.getMaxOffset()); + + ByteBuffer msg1 = segment.read(lastSize, MessageBufferUtilTest.MSG_LEN); + Assert.assertEquals(baseOffset + lastSize, MessageBufferUtil.getCommitLogOffset(msg1)); + + ByteBuffer msg2 = segment.read(lastSize + MessageBufferUtilTest.MSG_LEN, MessageBufferUtilTest.MSG_LEN); + Assert.assertEquals(baseOffset + lastSize + MessageBufferUtilTest.MSG_LEN, MessageBufferUtil.getCommitLogOffset(msg2)); + + ByteBuffer msg3 = segment.read(lastSize + MessageBufferUtilTest.MSG_LEN * 2, MessageBufferUtilTest.MSG_LEN); + Assert.assertEquals(baseOffset + lastSize + MessageBufferUtilTest.MSG_LEN * 2, MessageBufferUtil.getCommitLogOffset(msg3)); + } + + @Test + public void testCommitFailed3Times() { + long startTime = System.currentTimeMillis(); + MemoryFileSegment segment = (MemoryFileSegment) createFileSegment(FileSegmentType.COMMIT_LOG); + long lastSize = segment.getSize(); + segment.setCheckSize(false); + segment.initPosition(lastSize); + segment.setSize((int) lastSize); + + ByteBuffer buffer1 = MessageBufferUtilTest.buildMockedMessageBuffer().putLong( + MessageBufferUtil.PHYSICAL_OFFSET_POSITION, baseOffset + lastSize); + ByteBuffer buffer2 = MessageBufferUtilTest.buildMockedMessageBuffer().putLong( + MessageBufferUtil.PHYSICAL_OFFSET_POSITION, baseOffset + lastSize + MessageBufferUtilTest.MSG_LEN); + segment.append(buffer1, 0); + segment.append(buffer2, 0); + + // Mock new message arrive + segment.blocker = new CompletableFuture<>(); + new Thread(() -> { + try { + Thread.sleep(3000); + } catch (InterruptedException e) { + Assert.fail(e.getMessage()); + } + ByteBuffer buffer = MessageBufferUtilTest.buildMockedMessageBuffer(); + buffer.putLong(MessageBufferUtil.PHYSICAL_OFFSET_POSITION, MessageBufferUtilTest.MSG_LEN * 2); + buffer.putLong(MessageBufferUtil.STORE_TIMESTAMP_POSITION, startTime); + segment.append(buffer, 0); + segment.blocker.complete(false); + }).start(); + + for (int i = 0; i < 3; i++) { + segment.commit(); + } + + Assert.assertEquals(lastSize, segment.getCommitPosition()); + Assert.assertEquals(baseOffset + lastSize, segment.getCommitOffset()); + Assert.assertEquals(baseOffset + lastSize + MessageBufferUtilTest.MSG_LEN * 3, segment.getMaxOffset()); + + segment.blocker.join(); + segment.blocker = null; + segment.commit(); + Assert.assertEquals(lastSize + MessageBufferUtilTest.MSG_LEN * 2, segment.getCommitPosition()); + Assert.assertEquals(baseOffset + lastSize + MessageBufferUtilTest.MSG_LEN * 2, segment.getCommitOffset()); Assert.assertEquals(baseOffset + lastSize + MessageBufferUtilTest.MSG_LEN * 3, segment.getMaxOffset()); + + segment.commit(); + Assert.assertEquals(lastSize + MessageBufferUtilTest.MSG_LEN * 3, segment.getCommitPosition()); Assert.assertEquals(baseOffset + lastSize + MessageBufferUtilTest.MSG_LEN * 3, segment.getCommitOffset()); + Assert.assertEquals(baseOffset + lastSize + MessageBufferUtilTest.MSG_LEN * 3, segment.getMaxOffset()); ByteBuffer msg1 = segment.read(lastSize, MessageBufferUtilTest.MSG_LEN); Assert.assertEquals(baseOffset + lastSize, MessageBufferUtil.getCommitLogOffset(msg1)); diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/memory/MemoryFileSegment.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/memory/MemoryFileSegment.java index cb155cf8f7..80ad41f685 100644 --- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/memory/MemoryFileSegment.java +++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/memory/MemoryFileSegment.java @@ -23,7 +23,7 @@ import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.tieredstore.common.FileSegmentType; import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig; import org.apache.rocketmq.tieredstore.provider.TieredFileSegment; -import org.apache.rocketmq.tieredstore.provider.inputstream.TieredFileSegmentInputStream; +import org.apache.rocketmq.tieredstore.provider.stream.FileSegmentInputStream; import org.apache.rocketmq.tieredstore.util.TieredStoreUtil; import org.junit.Assert; @@ -33,6 +33,8 @@ public class MemoryFileSegment extends TieredFileSegment { public CompletableFuture<Boolean> blocker; + protected int size = 0; + protected boolean checkSize = true; public MemoryFileSegment(FileSegmentType fileType, MessageQueue messageQueue, long baseOffset, @@ -56,6 +58,18 @@ public class MemoryFileSegment extends TieredFileSegment { memStore.position((int) getSize()); } + public boolean isCheckSize() { + return checkSize; + } + + public void setCheckSize(boolean checkSize) { + this.checkSize = checkSize; + } + + public ByteBuffer getMemStore() { + return memStore; + } + @Override public String getPath() { return filePath; @@ -66,7 +80,11 @@ public class MemoryFileSegment extends TieredFileSegment { if (checkSize) { return 1000; } - return 0; + return size; + } + + public void setSize(int size) { + this.size = size; } @Override @@ -85,11 +103,11 @@ public class MemoryFileSegment extends TieredFileSegment { @Override public CompletableFuture<Boolean> commit0( - TieredFileSegmentInputStream inputStream, long position, int length, boolean append) { + FileSegmentInputStream inputStream, long position, int length, boolean append) { try { if (blocker != null && !blocker.get()) { - throw new IllegalStateException(); + throw new IllegalStateException("Commit Exception for Memory Test"); } } catch (InterruptedException | ExecutionException e) { Assert.fail(e.getMessage()); @@ -98,7 +116,6 @@ public class MemoryFileSegment extends TieredFileSegment { Assert.assertTrue(!checkSize || position >= getSize()); byte[] buffer = new byte[1024]; - int startPos = memStore.position(); try { int len; diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/memory/MemoryFileSegmentWithoutCheck.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/memory/MemoryFileSegmentWithoutCheck.java index 8ac330b370..630fd22236 100644 --- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/memory/MemoryFileSegmentWithoutCheck.java +++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/memory/MemoryFileSegmentWithoutCheck.java @@ -22,7 +22,7 @@ import java.util.concurrent.ExecutionException; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.tieredstore.common.FileSegmentType; import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig; -import org.apache.rocketmq.tieredstore.provider.inputstream.TieredFileSegmentInputStream; +import org.apache.rocketmq.tieredstore.provider.stream.FileSegmentInputStream; import org.apache.rocketmq.tieredstore.util.TieredStoreUtil; import org.junit.Assert; @@ -46,7 +46,7 @@ public class MemoryFileSegmentWithoutCheck extends MemoryFileSegment { } @Override - public CompletableFuture<Boolean> commit0(TieredFileSegmentInputStream inputStream, long position, int length, + public CompletableFuture<Boolean> commit0(FileSegmentInputStream inputStream, long position, int length, boolean append) { try { if (blocker != null && !blocker.get()) {