This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git

commit 2758df713ead7fd78fdee9f8a2af2ad2ce377fa9
Author: xleoken <[email protected]>
AuthorDate: Mon Oct 23 10:26:16 2023 +0800

    [CELEBORN-1030] Improve the logic of delete md5 files when initializing 
SimpleStateMachineStorage
    
    ### What changes were proposed in this pull request?
    
    We need to delete md5 file init SimpleStateMachineStorage based on 
ratis-2.0.0, but the logic about cleanup md5 files already support after 
RATIS-1752, so we can optimize initialization.
    
    Remove `MasterStateMachineSuiteJ#testSnapshotCleanup`, it already test 
cleanup snapshots and md5 files in
    
https://github.com/apache/ratis/blob/release-2.5.1/ratis-test/src/test/java/org/apache/ratis/server/storage/TestRaftStorage.java#L221
    
    <br>
    
    **links:**
    
    https://issues.apache.org/jira/browse/RATIS-1752
    
    
https://github.com/apache/ratis/blob/release-2.5.1/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/SimpleStateMachineStorage.java#L105
    
    ### Does this PR introduce _any_ user-facing change?
    
    ### How was this patch tested?
    
    local test.
    
    Closes #1966 from xleoken/patch.
    
    Authored-by: xleoken <[email protected]>
    Signed-off-by: mingji <[email protected]>
---
 .../deploy/master/clustermeta/ha/StateMachine.java | 111 +--------------------
 .../clustermeta/ha/MasterStateMachineSuiteJ.java   |  98 ------------------
 2 files changed, 1 insertion(+), 208 deletions(-)

diff --git 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/StateMachine.java
 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/StateMachine.java
index 5b3582f61..db537671e 100644
--- 
a/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/StateMachine.java
+++ 
b/master/src/main/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/StateMachine.java
@@ -25,17 +25,9 @@ import static org.apache.ratis.util.LifeCycle.State.STARTING;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.nio.file.DirectoryStream;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Comparator;
-import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
@@ -47,10 +39,8 @@ import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.server.raftlog.RaftLog;
-import org.apache.ratis.server.storage.FileInfo;
 import org.apache.ratis.server.storage.RaftStorage;
 import org.apache.ratis.statemachine.SnapshotInfo;
-import org.apache.ratis.statemachine.SnapshotRetentionPolicy;
 import org.apache.ratis.statemachine.StateMachineStorage;
 import org.apache.ratis.statemachine.TransactionContext;
 import org.apache.ratis.statemachine.impl.BaseStateMachine;
@@ -59,7 +49,6 @@ import 
org.apache.ratis.statemachine.impl.SimpleStateMachineStorageUtil;
 import org.apache.ratis.statemachine.impl.SingleFileSnapshotInfo;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.apache.ratis.util.ExitUtils;
-import org.apache.ratis.util.FileUtils;
 import org.apache.ratis.util.LifeCycle;
 import org.apache.ratis.util.MD5FileUtil;
 import org.slf4j.Logger;
@@ -72,94 +61,7 @@ import 
org.apache.celeborn.service.deploy.master.clustermeta.ResourceProtos.Reso
 public class StateMachine extends BaseStateMachine {
   private static final Logger LOG = 
LoggerFactory.getLogger(StateMachine.class);
 
-  public static final Pattern MD5_REGEX = 
Pattern.compile("snapshot\\.(\\d+)_(\\d+)\\.md5");
-
-  private final SimpleStateMachineStorage storage =
-      new SimpleStateMachineStorage() {
-        /**
-         * we need to delete md5 file as the same time as snapshot file 
deleted, so we override the
-         * SimpleStateMachineStorage.cleanupOldSnapshots method, add delete 
md5 file action.
-         *
-         * @param snapshotRetentionPolicy snapshot retention policy
-         * @throws IOException
-         */
-        @Override
-        public void cleanupOldSnapshots(SnapshotRetentionPolicy 
snapshotRetentionPolicy)
-            throws IOException {
-          if (snapshotRetentionPolicy != null
-              && snapshotRetentionPolicy.getNumSnapshotsRetained() > 0) {
-            List<SingleFileSnapshotInfo> allSnapshotFiles = new ArrayList<>();
-            List<SingleFileSnapshotInfo> allMD5Files = new ArrayList<>();
-            try (DirectoryStream<Path> stream =
-                
Files.newDirectoryStream(SimpleStateMachineStorageUtil.getSmDir(this).toPath()))
 {
-              for (Path path : stream) {
-                if (filePatternMatches(SNAPSHOT_REGEX, allSnapshotFiles, 
path)) {
-                  continue;
-                } else {
-                  filePatternMatches(MD5_REGEX, allMD5Files, path);
-                }
-              }
-            }
-            // first step, cleanup old snapshot and md5 file
-            SingleFileSnapshotInfo snapshotInfo =
-                cleanupOldFiles(
-                    allSnapshotFiles,
-                    snapshotRetentionPolicy.getNumSnapshotsRetained(),
-                    false,
-                    null);
-            // second step, cleanup only old md5 file
-            cleanupOldFiles(
-                allMD5Files, 
snapshotRetentionPolicy.getNumSnapshotsRetained(), true, snapshotInfo);
-          }
-        }
-
-        private boolean filePatternMatches(
-            Pattern pattern, List<SingleFileSnapshotInfo> result, Path 
filePath) {
-          Matcher md5Matcher = 
pattern.matcher(filePath.getFileName().toString());
-          if (md5Matcher.matches()) {
-            final long endIndex = Long.parseLong(md5Matcher.group(2));
-            final long term = Long.parseLong(md5Matcher.group(1));
-            final FileInfo fileInfo = new FileInfo(filePath, null);
-            result.add(new SingleFileSnapshotInfo(fileInfo, term, endIndex));
-            return true;
-          }
-          return false;
-        }
-
-        private SingleFileSnapshotInfo cleanupOldFiles(
-            List<SingleFileSnapshotInfo> inputFiles,
-            int retainedNum,
-            boolean onlyCleanupMD5Files,
-            SingleFileSnapshotInfo snapshotInfo) {
-          SingleFileSnapshotInfo result = null;
-          if (inputFiles.size() > retainedNum) {
-            inputFiles.sort(new RatisSnapshotFileComparator());
-            List<SingleFileSnapshotInfo> filesToBeCleaned =
-                inputFiles.subList(retainedNum, inputFiles.size());
-            result = filesToBeCleaned.get(0);
-            for (SingleFileSnapshotInfo fileInfo : filesToBeCleaned) {
-              if ((null != snapshotInfo && (fileInfo.getIndex() >= 
snapshotInfo.getIndex())
-                  || (onlyCleanupMD5Files && null == snapshotInfo))) {
-                continue;
-              }
-              File file = fileInfo.getFile().getPath().toFile();
-              if (onlyCleanupMD5Files) {
-                LOG.info("Deleting old md5 file at {}.", 
file.getAbsolutePath());
-                FileUtils.deleteFileQuietly(file);
-              } else {
-                File md5File = new File(file.getAbsolutePath() + 
MD5FileUtil.MD5_SUFFIX);
-                LOG.info(
-                    "Deleting old snapshot at {}, md5 file at {}.",
-                    file.getAbsolutePath(),
-                    md5File.getAbsolutePath());
-                FileUtils.deleteFileQuietly(file);
-                FileUtils.deleteFileQuietly(md5File);
-              }
-            }
-          }
-          return result;
-        }
-      };
+  private final SimpleStateMachineStorage storage = new 
SimpleStateMachineStorage();
 
   private final HARaftServer masterRatisServer;
   private RaftGroupId raftGroupId;
@@ -419,14 +321,3 @@ public class StateMachine extends BaseStateMachine {
     return this.storage;
   }
 }
-
-/**
- * Compare snapshot files based on transaction indexes. Copy from
- * org.apache.ratis.statemachine.impl.SnapshotFileComparator
- */
-class RatisSnapshotFileComparator implements 
Comparator<SingleFileSnapshotInfo> {
-  @Override
-  public int compare(SingleFileSnapshotInfo file1, SingleFileSnapshotInfo 
file2) {
-    return (int) (file2.getIndex() - file1.getIndex());
-  }
-}
diff --git 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterStateMachineSuiteJ.java
 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterStateMachineSuiteJ.java
index a136a1ea7..51f669a53 100644
--- 
a/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterStateMachineSuiteJ.java
+++ 
b/master/src/test/java/org/apache/celeborn/service/deploy/master/clustermeta/ha/MasterStateMachineSuiteJ.java
@@ -19,21 +19,11 @@ package 
org.apache.celeborn.service.deploy.master.clustermeta.ha;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.UUID;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.regex.Matcher;
 
-import org.apache.ratis.server.storage.RaftStorage;
-import org.apache.ratis.server.storage.StorageImplUtils;
 import org.apache.ratis.statemachine.SnapshotInfo;
-import org.apache.ratis.statemachine.SnapshotRetentionPolicy;
-import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
-import org.apache.ratis.statemachine.impl.SimpleStateMachineStorageUtil;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -44,7 +34,6 @@ import org.apache.celeborn.common.meta.DiskInfo;
 import org.apache.celeborn.common.meta.WorkerInfo;
 import org.apache.celeborn.common.quota.ResourceConsumption;
 import org.apache.celeborn.common.util.JavaUtils;
-import org.apache.celeborn.common.util.Utils;
 import org.apache.celeborn.service.deploy.master.clustermeta.ResourceProtos;
 import 
org.apache.celeborn.service.deploy.master.clustermeta.ResourceProtos.RequestSlotsRequest;
 import 
org.apache.celeborn.service.deploy.master.clustermeta.ResourceProtos.ResourceRequest;
@@ -114,93 +103,6 @@ public class MasterStateMachineSuiteJ extends 
RatisBaseSuiteJ {
     Assert.assertEquals(1, latest.getFiles().size());
   }
 
-  @Test
-  public void testSnapshotCleanup() throws IOException {
-    StateMachine stateMachine = ratisServer.getMasterStateMachine();
-    SnapshotRetentionPolicy snapshotRetentionPolicy =
-        new SnapshotRetentionPolicy() {
-          @Override
-          public int getNumSnapshotsRetained() {
-            return 3;
-          }
-        };
-
-    File storageDir = Utils.createTempDir("./", "snapshot");
-
-    System.out.println(storageDir);
-    final RaftStorage storage =
-        StorageImplUtils.newRaftStorage(storageDir, null, 
RaftStorage.StartupOption.FORMAT, 100);
-    storage.initialize();
-    SimpleStateMachineStorage simpleStateMachineStorage =
-        (SimpleStateMachineStorage) stateMachine.getStateMachineStorage();
-    simpleStateMachineStorage.init(storage);
-
-    List<Long> indices = new ArrayList<>();
-
-    // Create 5 snapshot files in storage dir.
-    for (int i = 0; i < 5; i++) {
-      final long term = ThreadLocalRandom.current().nextLong(3L, 10L);
-      final long index = ThreadLocalRandom.current().nextLong(100L, 1000L);
-      indices.add(index);
-      File snapshotFile = simpleStateMachineStorage.getSnapshotFile(term, 
index);
-      snapshotFile.createNewFile();
-      File md5File = new File(snapshotFile.getAbsolutePath() + ".md5");
-      md5File.createNewFile();
-    }
-
-    // following 2 md5 files will be deleted
-    File snapshotFile1 = simpleStateMachineStorage.getSnapshotFile(1, 1);
-    File md5File1 = new File(snapshotFile1.getAbsolutePath() + ".md5");
-    md5File1.createNewFile();
-    File snapshotFile2 = simpleStateMachineStorage.getSnapshotFile(5, 2);
-    File md5File2 = new File(snapshotFile2.getAbsolutePath() + ".md5");
-    md5File2.createNewFile();
-    // this md5 file will not be deleted
-    File snapshotFile3 = simpleStateMachineStorage.getSnapshotFile(11, 1001);
-    File md5File3 = new File(snapshotFile3.getAbsolutePath() + ".md5");
-    md5File3.createNewFile();
-
-    File stateMachineDir = 
SimpleStateMachineStorageUtil.getSmDir(simpleStateMachineStorage);
-    Assert.assertTrue(stateMachineDir.listFiles().length == 13);
-    simpleStateMachineStorage.cleanupOldSnapshots(snapshotRetentionPolicy);
-    File[] remainingFiles = stateMachineDir.listFiles();
-    Assert.assertTrue(remainingFiles.length == 7);
-
-    Collections.sort(indices);
-    Collections.reverse(indices);
-    List<Long> remainingIndices = indices.subList(0, 3);
-    // check snapshot file and its md5 file management
-    for (File file : remainingFiles) {
-      System.out.println(file.getName());
-      Matcher matcher = 
SimpleStateMachineStorage.SNAPSHOT_REGEX.matcher(file.getName());
-      if (matcher.matches()) {
-        
Assert.assertTrue(remainingIndices.contains(Long.parseLong(matcher.group(2))));
-        Assert.assertTrue(new File(file.getAbsolutePath() + ".md5").exists());
-      }
-    }
-
-    // Attempt to clean up again should not delete any more files.
-    simpleStateMachineStorage.cleanupOldSnapshots(snapshotRetentionPolicy);
-    remainingFiles = stateMachineDir.listFiles();
-    Assert.assertTrue(remainingFiles.length == 7);
-
-    // Test with Retention disabled.
-    // Create 2 snapshot files in storage dir.
-    for (int i = 0; i < 2; i++) {
-      final long term = ThreadLocalRandom.current().nextLong(10L);
-      final long index = ThreadLocalRandom.current().nextLong(1000L);
-      indices.add(index);
-      File snapshotFile = simpleStateMachineStorage.getSnapshotFile(term, 
index);
-      snapshotFile.createNewFile();
-      File md5File = new File(snapshotFile.getAbsolutePath() + ".md5");
-      md5File.createNewFile();
-    }
-
-    simpleStateMachineStorage.cleanupOldSnapshots(new 
SnapshotRetentionPolicy() {});
-
-    Assert.assertTrue(stateMachineDir.listFiles().length == 11);
-  }
-
   @Test
   public void testObjSerde() throws IOException, InterruptedException {
     CelebornConf conf = new CelebornConf();

Reply via email to