This is an automated email from the ASF dual-hosted git repository.

siyao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new b25410d691 HDDS-8069. [Snapshot] Compaction DAG reconstruction logic 
potentially crashing OM on startup (#4401)
b25410d691 is described below

commit b25410d691edd9388a9c426f7c8e1f0fd2b7da03
Author: Hemant Kumar <[email protected]>
AuthorDate: Thu Mar 16 02:41:59 2023 -0700

    HDDS-8069. [Snapshot] Compaction DAG reconstruction logic potentially 
crashing OM on startup (#4401)
---
 .../org/apache/hadoop/hdds/utils/db/RDBStore.java  |   4 +-
 .../ozone/rocksdiff/RocksDBCheckpointDiffer.java   |  54 +++---
 .../rocksdiff/TestRocksDBCheckpointDiffer.java     | 204 +++++++++------------
 3 files changed, 115 insertions(+), 147 deletions(-)

diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java
index d354aedc81..bb647bce35 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java
@@ -106,8 +106,8 @@ public class RDBStore implements DBStore {
       if (enableCompactionLog) {
         rocksDBCheckpointDiffer = new RocksDBCheckpointDiffer(
             dbLocation.getParent(), dbCompactionSSTBackupDirName,
-            dbCompactionLogDirName, dbLocation, maxTimeAllowedForSnapshotInDag,
-            compactionDagDaemonInterval);
+            dbCompactionLogDirName, dbLocation.toString(),
+            maxTimeAllowedForSnapshotInDag, compactionDagDaemonInterval);
         rocksDBCheckpointDiffer.setRocksDBForCompactionTracking(dbOptions);
       } else {
         rocksDBCheckpointDiffer = null;
diff --git 
a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
 
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
index e48340373f..f64ab5aece 100644
--- 
a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
+++ 
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
@@ -21,6 +21,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.graph.GraphBuilder;
 import com.google.common.graph.MutableGraph;
+import java.io.FileNotFoundException;
 import java.util.Collections;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -170,32 +171,37 @@ public class RocksDBCheckpointDiffer implements 
AutoCloseable {
    * Note that previous compaction logs are loaded by RDBStore after this
    * object's initialization by calling loadAllCompactionLogs().
    *
-   * @param metadataDir Ozone metadata directory.
-   * @param sstBackupDir Name of the SST backup dir under metadata dir.
+   * @param metadataDirName Ozone metadata directory.
+   * @param sstBackupDirName Name of the SST backup dir under metadata dir.
    * @param compactionLogDirName Name of the compaction log dir.
-   * @param activeDBLocation Active RocksDB directory's location.
+   * @param activeDBLocationName Active RocksDB directory's location.
    * @param maxTimeAllowedForSnapshotInDagInMs Time after which snapshot will 
be
    *                                           pruned from the DAG by daemon.
    * @param pruneCompactionDagDaemonRunIntervalInMs Internal at which DAG
    *                                               pruning daemon will run.
    */
-  public RocksDBCheckpointDiffer(String metadataDir,
-                                 String sstBackupDir,
+  public RocksDBCheckpointDiffer(String metadataDirName,
+                                 String sstBackupDirName,
                                  String compactionLogDirName,
-                                 File activeDBLocation,
+                                 String activeDBLocationName,
                                  long maxTimeAllowedForSnapshotInDagInMs,
                                  long pruneCompactionDagDaemonRunIntervalInMs) 
{
+    Preconditions.checkNotNull(metadataDirName);
+    Preconditions.checkNotNull(sstBackupDirName);
+    Preconditions.checkNotNull(compactionLogDirName);
+    Preconditions.checkNotNull(activeDBLocationName);
+
     this.compactionLogDir =
-        createCompactionLogDir(metadataDir, compactionLogDirName);
-    this.sstBackupDir = Paths.get(metadataDir, sstBackupDir) + "/";
+        createCompactionLogDir(metadataDirName, compactionLogDirName);
+    this.sstBackupDir = Paths.get(metadataDirName, sstBackupDirName) + "/";
     createSstBackUpDir();
 
     // Active DB location is used in getSSTFileSummary
-    this.activeDBLocationStr = activeDBLocation.toString() + "/";
+    this.activeDBLocationStr = activeDBLocationName + "/";
     this.maxAllowedTimeInDag = maxTimeAllowedForSnapshotInDagInMs;
-    this.executor = Executors.newSingleThreadScheduledExecutor();
 
     if (pruneCompactionDagDaemonRunIntervalInMs > 0) {
+      this.executor = Executors.newSingleThreadScheduledExecutor();
       this.executor.scheduleWithFixedDelay(
           this::pruneOlderSnapshotsWithCompactionHistory,
           pruneCompactionDagDaemonRunIntervalInMs,
@@ -208,20 +214,11 @@ public class RocksDBCheckpointDiffer implements 
AutoCloseable {
           pruneCompactionDagDaemonRunIntervalInMs,
           TimeUnit.MILLISECONDS
       );
+    } else {
+      this.executor = null;
     }
   }
 
-  public RocksDBCheckpointDiffer(String sstBackupDir,
-                                 String compactionLogDirName,
-                                 String activeDBLocationName,
-                                 long maxTimeAllowedForSnapshotInDagInMs) {
-    this.compactionLogDir = compactionLogDirName;
-    this.sstBackupDir = sstBackupDir;
-    this.activeDBLocationStr = activeDBLocationName;
-    this.maxAllowedTimeInDag = maxTimeAllowedForSnapshotInDagInMs;
-    this.executor = null;
-  }
-
   private String createCompactionLogDir(String metadataDir,
                                         String compactionLogDirName) {
 
@@ -522,12 +519,8 @@ public class RocksDBCheckpointDiffer implements 
AutoCloseable {
    * @param filename SST filename
    * @return number of keys
    */
-  private long getSSTFileSummary(String filename) throws RocksDBException {
-
-    if (activeDBLocationStr == null) {
-      // For testing only
-      return 1L;
-    }
+  private long getSSTFileSummary(String filename)
+      throws RocksDBException, FileNotFoundException {
 
     if (!filename.endsWith(SST_FILE_EXTENSION)) {
       filename += SST_FILE_EXTENSION;
@@ -545,7 +538,8 @@ public class RocksDBCheckpointDiffer implements 
AutoCloseable {
     return properties.getNumEntries();
   }
 
-  private String getAbsoluteSstFilePath(String filename) {
+  private String getAbsoluteSstFilePath(String filename)
+      throws FileNotFoundException {
     if (!filename.endsWith(SST_FILE_EXTENSION)) {
       filename += SST_FILE_EXTENSION;
     }
@@ -556,7 +550,7 @@ public class RocksDBCheckpointDiffer implements 
AutoCloseable {
     } else if (sstFileInActiveDB.exists()) {
       return activeDBLocationStr + filename;
     } else {
-      throw new RuntimeException("Can't find SST file: " + filename);
+      throw new FileNotFoundException("Can't find SST file: " + filename);
     }
   }
 
@@ -986,6 +980,8 @@ public class RocksDBCheckpointDiffer implements 
AutoCloseable {
       numKeys = getSSTFileSummary(file);
     } catch (RocksDBException e) {
       LOG.warn("Can't get num of keys in SST '{}': {}", file, e.getMessage());
+    } catch (FileNotFoundException e) {
+      LOG.info("Can't find SST '{}'", file, e);
     }
     CompactionNode fileNode = new CompactionNode(
         file, snapshotID, numKeys, seqNum);
diff --git 
a/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java
 
b/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java
index f634222c63..c1ff195c77 100644
--- 
a/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java
+++ 
b/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java
@@ -32,9 +32,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
-import java.util.LinkedList;
 import java.util.List;
-import java.util.Queue;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
@@ -46,6 +44,7 @@ import com.google.common.graph.MutableGraph;
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer.NodeComparator;
 import org.apache.ozone.test.GenericTestUtils;
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -84,10 +83,6 @@ public class TestRocksDBCheckpointDiffer {
   private static final Logger LOG =
       LoggerFactory.getLogger(TestRocksDBCheckpointDiffer.class);
 
-  /**
-   * RocksDB path for the test.
-   */
-  private static final String TEST_DB_PATH = "./rocksdb-data";
   private static final int NUM_ROW = 250000;
   private static final int SNAPSHOT_EVERY_SO_MANY_KEYS = 49999;
 
@@ -97,12 +92,53 @@ public class TestRocksDBCheckpointDiffer {
   private static final String CP_PATH_PREFIX = "rocksdb-cp-";
   private final List<DifferSnapshotInfo> snapshots = new ArrayList<>();
 
+  private final String activeDbDirName = "./rocksdb-data";
+  private final String metadataDirName = "./metadata";
+  private final String compactionLogDirName = "compaction-log";
+  private final String sstBackUpDirName = "compaction-sst-backup";
+  private File activeDbDir;
+  private File metadataDirDir;
+  private File compactionLogDir;
+  private File sstBackUpDir;
+
   @BeforeEach
   public void init() {
     // Checkpoint differ log level. Set to DEBUG for verbose output
     GenericTestUtils.setLogLevel(RocksDBCheckpointDiffer.getLog(), Level.INFO);
     // Test class log level. Set to DEBUG for verbose output
     GenericTestUtils.setLogLevel(TestRocksDBCheckpointDiffer.LOG, Level.INFO);
+
+    activeDbDir = new File(activeDbDirName);
+    createDir(activeDbDir, activeDbDirName);
+
+    metadataDirDir = new File(metadataDirName);
+    createDir(metadataDirDir, metadataDirName);
+
+    compactionLogDir = new File(metadataDirName, compactionLogDirName);
+    createDir(compactionLogDir, metadataDirName + "/" + compactionLogDirName);
+
+    sstBackUpDir = new File(metadataDirName, sstBackUpDirName);
+    createDir(sstBackUpDir, metadataDirName + "/" + sstBackUpDirName);
+  }
+
+  private void createDir(File file, String filePath) {
+    // Remove already existed dir.
+    if (file.exists()) {
+      deleteDirectory(file);
+    }
+
+    // Create new Dir.
+    if (!file.mkdirs()) {
+      fail("Error in creating directory: " + filePath);
+    }
+  }
+
+  @AfterEach
+  public void cleanUp() {
+    deleteDirectory(compactionLogDir);
+    deleteDirectory(sstBackUpDir);
+    deleteDirectory(metadataDirDir);
+    deleteDirectory(activeDbDir);
   }
 
   /**
@@ -206,7 +242,12 @@ public class TestRocksDBCheckpointDiffer {
       boolean expectingException) {
 
     RocksDBCheckpointDiffer differ =
-        new RocksDBCheckpointDiffer(null, null, null, 0L);
+        new RocksDBCheckpointDiffer(metadataDirName,
+            sstBackUpDirName,
+            compactionLogDirName,
+            activeDbDirName,
+            0L,
+            0L);
     boolean exceptionThrown = false;
     long createdTime = System.currentTimeMillis();
 
@@ -274,32 +315,17 @@ public class TestRocksDBCheckpointDiffer {
    */
   @Test
   void testDifferWithDB() throws Exception {
+    RocksDBCheckpointDiffer differ =
+        new RocksDBCheckpointDiffer(metadataDirName,
+            sstBackUpDirName,
+            compactionLogDirName,
+            activeDbDirName,
+            TimeUnit.DAYS.toMillis(1),
+            MINUTES.toMillis(5));
 
-    final String clDirStr = "compaction-log";
-    // Delete the compaction log dir for the test, if it exists
-    File clDir = new File(clDirStr);
-    if (clDir.exists()) {
-      deleteDirectory(clDir);
-    }
-
-    final String metadataDirStr = ".";
-    final String sstDirStr = "compaction-sst-backup";
-
-    final File dbLocation = new File(TEST_DB_PATH);
-    RocksDBCheckpointDiffer differ = new RocksDBCheckpointDiffer(
-        metadataDirStr, sstDirStr, clDirStr, dbLocation,
-        TimeUnit.DAYS.toMillis(1),
-        MINUTES.toMillis(5));
-
-    // Empty the SST backup folder first for testing
-    File sstDir = new File(sstDirStr);
-    deleteDirectory(sstDir);
-    if (!sstDir.mkdir()) {
-      fail("Unable to create SST backup directory");
-    }
-
-    RocksDB rocksDB = createRocksDBInstanceAndWriteKeys(TEST_DB_PATH, differ);
-    readRocksDBInstance(TEST_DB_PATH, rocksDB, null, differ);
+    RocksDB rocksDB =
+        createRocksDBInstanceAndWriteKeys(activeDbDirName, differ);
+    readRocksDBInstance(activeDbDirName, rocksDB, null, differ);
 
     if (LOG.isDebugEnabled()) {
       printAllSnapshots();
@@ -312,7 +338,7 @@ public class TestRocksDBCheckpointDiffer {
     diffAllSnapshots(differ);
 
     // Confirm correct links created
-    try (Stream<Path> sstPathStream = Files.list(sstDir.toPath())) {
+    try (Stream<Path> sstPathStream = Files.list(sstBackUpDir.toPath())) {
       List<String> expectedLinks = sstPathStream.map(Path::getFileName)
               .map(Object::toString).sorted().collect(Collectors.toList());
       Assertions.assertEquals(expectedLinks, asList(
@@ -379,7 +405,7 @@ public class TestRocksDBCheckpointDiffer {
 
     final long dbLatestSequenceNumber = rocksDB.getLatestSequenceNumber();
 
-    createCheckPoint(TEST_DB_PATH, cpPath, rocksDB);
+    createCheckPoint(activeDbDirName, cpPath, rocksDB);
     final String snapshotId = "snap_id_" + snapshotGeneration;
     final DifferSnapshotInfo currentSnapshot =
         new DifferSnapshotInfo(cpPath, snapshotId, snapshotGeneration, null);
@@ -632,45 +658,6 @@ public class TestRocksDBCheckpointDiffer {
     }
   }
 
-  private void printMutableGraph(String srcSnapId, String destSnapId,
-      MutableGraph<CompactionNode> mutableGraph) {
-
-    LOG.debug("Gathering all SST file nodes from src '{}' to dest '{}'",
-        srcSnapId, destSnapId);
-
-    final Queue<CompactionNode> nodeQueue = new LinkedList<>();
-    // Queue source snapshot SST file nodes
-    for (CompactionNode node : mutableGraph.nodes()) {
-      if (srcSnapId == null ||
-          node.getSnapshotId().compareToIgnoreCase(srcSnapId) == 0) {
-        nodeQueue.add(node);
-      }
-    }
-
-    final Set<CompactionNode> allNodesSet = new HashSet<>();
-    while (!nodeQueue.isEmpty()) {
-      CompactionNode node = nodeQueue.poll();
-      Set<CompactionNode> succSet = mutableGraph.successors(node);
-      LOG.debug("Current node: {}", node);
-      if (succSet.isEmpty()) {
-        LOG.debug("Has no successor node");
-        allNodesSet.add(node);
-        continue;
-      }
-      for (CompactionNode succNode : succSet) {
-        LOG.debug("Has successor node: {}", succNode);
-        if (srcSnapId == null ||
-            succNode.getSnapshotId().compareToIgnoreCase(destSnapId) == 0) {
-          allNodesSet.add(succNode);
-          continue;
-        }
-        nodeQueue.add(succNode);
-      }
-    }
-
-    LOG.debug("Files are: {}", allNodesSet);
-  }
-
   private static final List<List<String>> SST_FILES_BY_LEVEL = Arrays.asList(
       Arrays.asList("000015", "000013", "000011", "000009"),
       Arrays.asList("000018", "000016", "000017", "000026", "000024", "000022",
@@ -830,7 +817,12 @@ public class TestRocksDBCheckpointDiffer {
                                    Set<String> expectedFileNodesRemoved) {
 
     RocksDBCheckpointDiffer differ =
-        new RocksDBCheckpointDiffer(null, null, null, 0L);
+        new RocksDBCheckpointDiffer(metadataDirName,
+            sstBackUpDirName,
+            compactionLogDirName,
+            activeDbDirName,
+            0L,
+            0L);
     Set<String> actualFileNodesRemoved =
         differ.pruneBackwardDag(originalDag, levelToBeRemoved);
     Assertions.assertEquals(expectedDag, originalDag);
@@ -888,7 +880,12 @@ public class TestRocksDBCheckpointDiffer {
                                   Set<String> expectedFileNodesRemoved) {
 
     RocksDBCheckpointDiffer differ =
-        new RocksDBCheckpointDiffer(null, null, null, 0L);
+        new RocksDBCheckpointDiffer(metadataDirName,
+            sstBackUpDirName,
+            compactionLogDirName,
+            activeDbDirName,
+            0L,
+            0L);
     Set<String> actualFileNodesRemoved =
         differ.pruneForwardDag(originalDag, levelToBeRemoved);
     Assertions.assertEquals(expectedDag, originalDag);
@@ -1053,23 +1050,11 @@ public class TestRocksDBCheckpointDiffer {
       Set<String> expectedNodes,
       int expectedNumberOfLogFilesDeleted
   ) throws IOException {
-    String compactionLogDirName = "./test-compaction-log";
-    File compactionLogDir = new File(compactionLogDirName);
-    if (!compactionLogDir.exists() && !compactionLogDir.mkdirs()) {
-      fail("Error creating compaction log directory: " + compactionLogDirName);
-    }
-
-    String sstBackUpDirName = "./test-compaction-sst-backup";
-    File sstBackUpDir = new File(sstBackUpDirName);
-    if (!sstBackUpDir.exists() && !sstBackUpDir.mkdirs()) {
-      fail("Error creating SST backup directory: " + sstBackUpDirName);
-    }
-
     List<File> filesCreated = new ArrayList<>();
 
     for (int i = 0; i < compactionLogs.size(); i++) {
-      String compactionFileName =
-          compactionLogDirName + "/0000" + i + COMPACTION_LOG_FILE_NAME_SUFFIX;
+      String compactionFileName = metadataDirName + "/" + compactionLogDirName
+          + "/0000" + i + COMPACTION_LOG_FILE_NAME_SUFFIX;
       File compactionFile = new File(compactionFileName);
       Files.write(compactionFile.toPath(),
           compactionLogs.get(i).getBytes(UTF_8));
@@ -1077,10 +1062,12 @@ public class TestRocksDBCheckpointDiffer {
     }
 
     RocksDBCheckpointDiffer differ =
-        new RocksDBCheckpointDiffer(sstBackUpDirName,
+        new RocksDBCheckpointDiffer(metadataDirName,
+            sstBackUpDirName,
             compactionLogDirName,
-            null,
-            MINUTES.toMillis(10));
+            activeDbDirName,
+            MINUTES.toMillis(10),
+            0L);
 
     differ.loadAllCompactionLogs();
 
@@ -1111,9 +1098,6 @@ public class TestRocksDBCheckpointDiffer {
       File compactionFile = filesCreated.get(i);
       assertTrue(compactionFile.exists());
     }
-
-    deleteDirectory(compactionLogDir);
-    deleteDirectory(sstBackUpDir);
   }
 
   private static Stream<Arguments> sstFilePruningScenarios() {
@@ -1164,21 +1148,8 @@ public class TestRocksDBCheckpointDiffer {
       List<String> initialFiles,
       List<String> expectedFiles
   ) throws IOException {
-
-    String sstBackUpDirName = "./test-compaction-sst-backup";
-    File sstBackUpDir = new File(sstBackUpDirName);
-    if (!sstBackUpDir.exists() && !sstBackUpDir.mkdirs()) {
-      fail("Error creating SST backup directory: " + sstBackUpDirName);
-    }
-
-    String compactionLogDirName = "./test-compaction-log";
-    File compactionLogDir = new File(compactionLogDirName);
-    if (!compactionLogDir.exists() && !compactionLogDir.mkdirs()) {
-      fail("Error creating compaction log directory: " + compactionLogDirName);
-    }
-
-    createFileWithContext(compactionLogDirName + "/compaction_log" +
-            COMPACTION_LOG_FILE_NAME_SUFFIX,
+    createFileWithContext(metadataDirName + "/" + compactionLogDirName
+            + "/compaction_log" + COMPACTION_LOG_FILE_NAME_SUFFIX,
         compactionLog);
 
     for (String fileName : initialFiles) {
@@ -1187,16 +1158,19 @@ public class TestRocksDBCheckpointDiffer {
     }
 
     RocksDBCheckpointDiffer differ =
-        new RocksDBCheckpointDiffer(sstBackUpDirName,
+        new RocksDBCheckpointDiffer(metadataDirName,
+            sstBackUpDirName,
             compactionLogDirName,
-            null,
-            MINUTES.toMillis(10));
+            activeDbDirName,
+            MINUTES.toMillis(10),
+            0L);
 
     differ.loadAllCompactionLogs();
     differ.pruneSstFiles();
 
     Set<String> actualFileSetAfterPruning;
-    try (Stream<Path> pathStream = Files.list(Paths.get(sstBackUpDirName))
+    try (Stream<Path> pathStream = Files.list(
+            Paths.get(metadataDirName + "/" + sstBackUpDirName))
         .filter(e -> e.toString().toLowerCase()
             .endsWith(SST_FILE_EXTENSION))
         .sorted()) {
@@ -1209,8 +1183,6 @@ public class TestRocksDBCheckpointDiffer {
 
     Set<String> expectedFileSet = new HashSet<>(expectedFiles);
     assertEquals(expectedFileSet, actualFileSetAfterPruning);
-    deleteDirectory(compactionLogDir);
-    deleteDirectory(sstBackUpDir);
   }
 
   private void createFileWithContext(String fileName, String context)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to