This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new fd32dae2ab [ISSUE #6633] Not clear uninitialized files and fix 
metadata recover (#7342)
fd32dae2ab is described below

commit fd32dae2ab59f86dd215eeec405bf4fa6212bcb3
Author: lizhimins <707364...@qq.com>
AuthorDate: Tue Sep 12 19:58:08 2023 +0800

    [ISSUE #6633] Not clear uninitialized files and fix metadata recover (#7342)
---
 .../rocketmq/tieredstore/file/TieredFlatFile.java  | 53 ++++++++--------------
 .../tieredstore/file/TieredFlatFileManager.java    | 10 ++--
 2 files changed, 22 insertions(+), 41 deletions(-)

diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java
index d973179eed..d96eb6e8f3 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java
@@ -16,7 +16,6 @@
  */
 package org.apache.rocketmq.tieredstore.file;
 
-import com.alibaba.fastjson.JSON;
 import com.google.common.annotations.VisibleForTesting;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -25,13 +24,13 @@ import java.util.Comparator;
 import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.stream.Collectors;
 import javax.annotation.Nullable;
+import org.apache.rocketmq.common.BoundaryType;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 import org.apache.rocketmq.tieredstore.common.AppendResult;
@@ -43,7 +42,6 @@ import 
org.apache.rocketmq.tieredstore.metadata.TieredMetadataStore;
 import org.apache.rocketmq.tieredstore.provider.FileSegmentAllocator;
 import org.apache.rocketmq.tieredstore.provider.TieredFileSegment;
 import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
-import org.apache.rocketmq.common.BoundaryType;
 
 public class TieredFlatFile {
 
@@ -177,7 +175,10 @@ public class TieredFlatFile {
         }
     }
 
-    private FileSegmentMetadata 
getOrCreateFileSegmentMetadata(TieredFileSegment fileSegment) {
+    /**
+     * FileQueue Status: Sealed | Sealed | Sealed | Not sealed, Allow appended 
&& Not Full
+     */
+    public void updateFileSegment(TieredFileSegment fileSegment) {
 
         FileSegmentMetadata metadata = tieredMetadataStore.getFileSegment(
             this.filePath, fileSegment.getFileType(), 
fileSegment.getBaseOffset());
@@ -186,45 +187,24 @@ public class TieredFlatFile {
         if (metadata == null) {
             metadata = new FileSegmentMetadata(
                 this.filePath, fileSegment.getBaseOffset(), 
fileSegment.getFileType().getType());
-            metadata.setCreateTimestamp(fileSegment.getMinTimestamp());
-            metadata.setBeginTimestamp(fileSegment.getMinTimestamp());
-            metadata.setEndTimestamp(fileSegment.getMaxTimestamp());
-            if (fileSegment.isClosed()) {
-                metadata.setStatus(FileSegmentMetadata.STATUS_DELETED);
-            }
-            this.tieredMetadataStore.updateFileSegment(metadata);
+            metadata.setCreateTimestamp(System.currentTimeMillis());
         }
-        return metadata;
-    }
-
-    /**
-     * FileQueue Status: Sealed | Sealed | Sealed | Not sealed, Allow appended 
&& Not Full
-     */
-    public void updateFileSegment(TieredFileSegment fileSegment) {
-        FileSegmentMetadata segmentMetadata = 
getOrCreateFileSegmentMetadata(fileSegment);
 
-        if (segmentMetadata.getStatus() == FileSegmentMetadata.STATUS_NEW
-            && fileSegment.isFull()
-            && !fileSegment.needCommit()) {
+        metadata.setSize(fileSegment.getCommitPosition());
+        metadata.setBeginTimestamp(fileSegment.getMinTimestamp());
+        metadata.setEndTimestamp(fileSegment.getMaxTimestamp());
 
-            segmentMetadata.markSealed();
+        if (fileSegment.isFull() && !fileSegment.needCommit()) {
+            if (metadata.getStatus() == FileSegmentMetadata.STATUS_NEW) {
+                metadata.markSealed();
+            }
         }
 
         if (fileSegment.isClosed()) {
-            segmentMetadata.setStatus(FileSegmentMetadata.STATUS_DELETED);
+            metadata.setStatus(FileSegmentMetadata.STATUS_DELETED);
         }
 
-        segmentMetadata.setSize(fileSegment.getCommitPosition());
-        segmentMetadata.setEndTimestamp(fileSegment.getMaxTimestamp());
-
-        FileSegmentMetadata metadata = tieredMetadataStore.getFileSegment(
-            this.filePath, fileSegment.getFileType(), 
fileSegment.getBaseOffset());
-
-        if (!Objects.equals(metadata, segmentMetadata)) {
-            this.tieredMetadataStore.updateFileSegment(segmentMetadata);
-            logger.debug("TieredFlatFile#UpdateSegmentMetadata, filePath: {}, 
content: {}",
-                segmentMetadata.getPath(), JSON.toJSONString(segmentMetadata));
-        }
+        this.tieredMetadataStore.updateFileSegment(metadata);
     }
 
     private void checkAndFixFileSize() {
@@ -598,6 +578,9 @@ public class TieredFlatFile {
                     logger.error("TieredFlatFile#destroy: mark file segment: 
{} is deleted failed", fileSegment.getPath(), e);
                 }
                 fileSegment.destroyFile();
+                if (!fileSegment.exists()) {
+                    tieredMetadataStore.deleteFileSegment(filePath, fileType, 
fileSegment.getBaseOffset());
+                }
             }
             fileSegmentList.clear();
             needCommitFileSegmentList.clear();
diff --git 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java
 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java
index 7c744af3b9..087ea8c9ce 100644
--- 
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java
+++ 
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java
@@ -136,15 +136,13 @@ public class TieredFlatFileManager {
             
TimeUnit.HOURS.toMillis(storeConfig.getTieredStoreFileReservedTime());
         for (CompositeQueueFlatFile flatFile : deepCopyFlatFileToList()) {
             TieredStoreExecutor.cleanExpiredFileExecutor.submit(() -> {
-                flatFile.getCompositeFlatFileLock().lock();
                 try {
+                    flatFile.getCompositeFlatFileLock().lock();
                     flatFile.cleanExpiredFile(expiredTimeStamp);
                     flatFile.destroyExpiredFile();
-                    if (flatFile.getConsumeQueueBaseOffset() == -1) {
-                        logger.info("Clean flatFile because file not 
initialized, topic={}, queueId={}",
-                            flatFile.getMessageQueue().getTopic(), 
flatFile.getMessageQueue().getQueueId());
-                        destroyCompositeFile(flatFile.getMessageQueue());
-                    }
+                } catch (Throwable t) {
+                    logger.error("Do Clean expired file error, topic={}, 
queueId={}",
+                        flatFile.getMessageQueue().getTopic(), 
flatFile.getMessageQueue().getQueueId(), t);
                 } finally {
                     flatFile.getCompositeFlatFileLock().unlock();
                 }

Reply via email to