This is an automated email from the ASF dual-hosted git repository. lizhimin pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new fd32dae2ab [ISSUE #6633] Not clear uninitialized files and fix metadata recover (#7342) fd32dae2ab is described below commit fd32dae2ab59f86dd215eeec405bf4fa6212bcb3 Author: lizhimins <707364...@qq.com> AuthorDate: Tue Sep 12 19:58:08 2023 +0800 [ISSUE #6633] Not clear uninitialized files and fix metadata recover (#7342) --- .../rocketmq/tieredstore/file/TieredFlatFile.java | 53 ++++++++-------------- .../tieredstore/file/TieredFlatFileManager.java | 10 ++-- 2 files changed, 22 insertions(+), 41 deletions(-) diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java index d973179eed..d96eb6e8f3 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java @@ -16,7 +16,6 @@ */ package org.apache.rocketmq.tieredstore.file; -import com.alibaba.fastjson.JSON; import com.google.common.annotations.VisibleForTesting; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -25,13 +24,13 @@ import java.util.Comparator; import java.util.HashSet; import java.util.LinkedList; import java.util.List; -import java.util.Objects; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; import javax.annotation.Nullable; +import org.apache.rocketmq.common.BoundaryType; import org.apache.rocketmq.logging.org.slf4j.Logger; import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; import org.apache.rocketmq.tieredstore.common.AppendResult; @@ -43,7 +42,6 @@ import org.apache.rocketmq.tieredstore.metadata.TieredMetadataStore; import org.apache.rocketmq.tieredstore.provider.FileSegmentAllocator; import org.apache.rocketmq.tieredstore.provider.TieredFileSegment; import org.apache.rocketmq.tieredstore.util.TieredStoreUtil; -import org.apache.rocketmq.common.BoundaryType; public class TieredFlatFile { @@ -177,7 +175,10 @@ public class TieredFlatFile { } } - private FileSegmentMetadata getOrCreateFileSegmentMetadata(TieredFileSegment fileSegment) { + /** + * FileQueue Status: Sealed | Sealed | Sealed | Not sealed, Allow appended && Not Full + */ + public void updateFileSegment(TieredFileSegment fileSegment) { FileSegmentMetadata metadata = tieredMetadataStore.getFileSegment( this.filePath, fileSegment.getFileType(), fileSegment.getBaseOffset()); @@ -186,45 +187,24 @@ public class TieredFlatFile { if (metadata == null) { metadata = new FileSegmentMetadata( this.filePath, fileSegment.getBaseOffset(), fileSegment.getFileType().getType()); - metadata.setCreateTimestamp(fileSegment.getMinTimestamp()); - metadata.setBeginTimestamp(fileSegment.getMinTimestamp()); - metadata.setEndTimestamp(fileSegment.getMaxTimestamp()); - if (fileSegment.isClosed()) { - metadata.setStatus(FileSegmentMetadata.STATUS_DELETED); - } - this.tieredMetadataStore.updateFileSegment(metadata); + metadata.setCreateTimestamp(System.currentTimeMillis()); } - return metadata; - } - - /** - * FileQueue Status: Sealed | Sealed | Sealed | Not sealed, Allow appended && Not Full - */ - public void updateFileSegment(TieredFileSegment fileSegment) { - FileSegmentMetadata segmentMetadata = getOrCreateFileSegmentMetadata(fileSegment); - if (segmentMetadata.getStatus() == FileSegmentMetadata.STATUS_NEW - && fileSegment.isFull() - && !fileSegment.needCommit()) { + metadata.setSize(fileSegment.getCommitPosition()); + metadata.setBeginTimestamp(fileSegment.getMinTimestamp()); + metadata.setEndTimestamp(fileSegment.getMaxTimestamp()); - segmentMetadata.markSealed(); + if (fileSegment.isFull() && !fileSegment.needCommit()) { + if (metadata.getStatus() == FileSegmentMetadata.STATUS_NEW) { + metadata.markSealed(); + } } if (fileSegment.isClosed()) { - segmentMetadata.setStatus(FileSegmentMetadata.STATUS_DELETED); + metadata.setStatus(FileSegmentMetadata.STATUS_DELETED); } - segmentMetadata.setSize(fileSegment.getCommitPosition()); - segmentMetadata.setEndTimestamp(fileSegment.getMaxTimestamp()); - - FileSegmentMetadata metadata = tieredMetadataStore.getFileSegment( - this.filePath, fileSegment.getFileType(), fileSegment.getBaseOffset()); - - if (!Objects.equals(metadata, segmentMetadata)) { - this.tieredMetadataStore.updateFileSegment(segmentMetadata); - logger.debug("TieredFlatFile#UpdateSegmentMetadata, filePath: {}, content: {}", - segmentMetadata.getPath(), JSON.toJSONString(segmentMetadata)); - } + this.tieredMetadataStore.updateFileSegment(metadata); } private void checkAndFixFileSize() { @@ -598,6 +578,9 @@ public class TieredFlatFile { logger.error("TieredFlatFile#destroy: mark file segment: {} is deleted failed", fileSegment.getPath(), e); } fileSegment.destroyFile(); + if (!fileSegment.exists()) { + tieredMetadataStore.deleteFileSegment(filePath, fileType, fileSegment.getBaseOffset()); + } } fileSegmentList.clear(); needCommitFileSegmentList.clear(); diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java index 7c744af3b9..087ea8c9ce 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java @@ -136,15 +136,13 @@ public class TieredFlatFileManager { TimeUnit.HOURS.toMillis(storeConfig.getTieredStoreFileReservedTime()); for (CompositeQueueFlatFile flatFile : deepCopyFlatFileToList()) { TieredStoreExecutor.cleanExpiredFileExecutor.submit(() -> { - flatFile.getCompositeFlatFileLock().lock(); try { + flatFile.getCompositeFlatFileLock().lock(); flatFile.cleanExpiredFile(expiredTimeStamp); flatFile.destroyExpiredFile(); - if (flatFile.getConsumeQueueBaseOffset() == -1) { - logger.info("Clean flatFile because file not initialized, topic={}, queueId={}", - flatFile.getMessageQueue().getTopic(), flatFile.getMessageQueue().getQueueId()); - destroyCompositeFile(flatFile.getMessageQueue()); - } + } catch (Throwable t) { + logger.error("Do Clean expired file error, topic={}, queueId={}", + flatFile.getMessageQueue().getTopic(), flatFile.getMessageQueue().getQueueId(), t); } finally { flatFile.getCompositeFlatFileLock().unlock(); }