This is an automated email from the ASF dual-hosted git repository.
siyao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new b25410d691 HDDS-8069. [Snapshot] Compaction DAG reconstruction logic
potentially crashing OM on startup (#4401)
b25410d691 is described below
commit b25410d691edd9388a9c426f7c8e1f0fd2b7da03
Author: Hemant Kumar <[email protected]>
AuthorDate: Thu Mar 16 02:41:59 2023 -0700
HDDS-8069. [Snapshot] Compaction DAG reconstruction logic potentially
crashing OM on startup (#4401)
---
.../org/apache/hadoop/hdds/utils/db/RDBStore.java | 4 +-
.../ozone/rocksdiff/RocksDBCheckpointDiffer.java | 54 +++---
.../rocksdiff/TestRocksDBCheckpointDiffer.java | 204 +++++++++------------
3 files changed, 115 insertions(+), 147 deletions(-)
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java
index d354aedc81..bb647bce35 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java
@@ -106,8 +106,8 @@ public class RDBStore implements DBStore {
if (enableCompactionLog) {
rocksDBCheckpointDiffer = new RocksDBCheckpointDiffer(
dbLocation.getParent(), dbCompactionSSTBackupDirName,
- dbCompactionLogDirName, dbLocation, maxTimeAllowedForSnapshotInDag,
- compactionDagDaemonInterval);
+ dbCompactionLogDirName, dbLocation.toString(),
+ maxTimeAllowedForSnapshotInDag, compactionDagDaemonInterval);
rocksDBCheckpointDiffer.setRocksDBForCompactionTracking(dbOptions);
} else {
rocksDBCheckpointDiffer = null;
diff --git
a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
index e48340373f..f64ab5aece 100644
---
a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
+++
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
@@ -21,6 +21,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.graph.GraphBuilder;
import com.google.common.graph.MutableGraph;
+import java.io.FileNotFoundException;
import java.util.Collections;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -170,32 +171,37 @@ public class RocksDBCheckpointDiffer implements
AutoCloseable {
* Note that previous compaction logs are loaded by RDBStore after this
* object's initialization by calling loadAllCompactionLogs().
*
- * @param metadataDir Ozone metadata directory.
- * @param sstBackupDir Name of the SST backup dir under metadata dir.
+ * @param metadataDirName Ozone metadata directory.
+ * @param sstBackupDirName Name of the SST backup dir under metadata dir.
* @param compactionLogDirName Name of the compaction log dir.
- * @param activeDBLocation Active RocksDB directory's location.
+ * @param activeDBLocationName Active RocksDB directory's location.
* @param maxTimeAllowedForSnapshotInDagInMs Time after which snapshot will
be
* pruned from the DAG by daemon.
* @param pruneCompactionDagDaemonRunIntervalInMs Internal at which DAG
* pruning daemon will run.
*/
- public RocksDBCheckpointDiffer(String metadataDir,
- String sstBackupDir,
+ public RocksDBCheckpointDiffer(String metadataDirName,
+ String sstBackupDirName,
String compactionLogDirName,
- File activeDBLocation,
+ String activeDBLocationName,
long maxTimeAllowedForSnapshotInDagInMs,
long pruneCompactionDagDaemonRunIntervalInMs)
{
+ Preconditions.checkNotNull(metadataDirName);
+ Preconditions.checkNotNull(sstBackupDirName);
+ Preconditions.checkNotNull(compactionLogDirName);
+ Preconditions.checkNotNull(activeDBLocationName);
+
this.compactionLogDir =
- createCompactionLogDir(metadataDir, compactionLogDirName);
- this.sstBackupDir = Paths.get(metadataDir, sstBackupDir) + "/";
+ createCompactionLogDir(metadataDirName, compactionLogDirName);
+ this.sstBackupDir = Paths.get(metadataDirName, sstBackupDirName) + "/";
createSstBackUpDir();
// Active DB location is used in getSSTFileSummary
- this.activeDBLocationStr = activeDBLocation.toString() + "/";
+ this.activeDBLocationStr = activeDBLocationName + "/";
this.maxAllowedTimeInDag = maxTimeAllowedForSnapshotInDagInMs;
- this.executor = Executors.newSingleThreadScheduledExecutor();
if (pruneCompactionDagDaemonRunIntervalInMs > 0) {
+ this.executor = Executors.newSingleThreadScheduledExecutor();
this.executor.scheduleWithFixedDelay(
this::pruneOlderSnapshotsWithCompactionHistory,
pruneCompactionDagDaemonRunIntervalInMs,
@@ -208,20 +214,11 @@ public class RocksDBCheckpointDiffer implements
AutoCloseable {
pruneCompactionDagDaemonRunIntervalInMs,
TimeUnit.MILLISECONDS
);
+ } else {
+ this.executor = null;
}
}
- public RocksDBCheckpointDiffer(String sstBackupDir,
- String compactionLogDirName,
- String activeDBLocationName,
- long maxTimeAllowedForSnapshotInDagInMs) {
- this.compactionLogDir = compactionLogDirName;
- this.sstBackupDir = sstBackupDir;
- this.activeDBLocationStr = activeDBLocationName;
- this.maxAllowedTimeInDag = maxTimeAllowedForSnapshotInDagInMs;
- this.executor = null;
- }
-
private String createCompactionLogDir(String metadataDir,
String compactionLogDirName) {
@@ -522,12 +519,8 @@ public class RocksDBCheckpointDiffer implements
AutoCloseable {
* @param filename SST filename
* @return number of keys
*/
- private long getSSTFileSummary(String filename) throws RocksDBException {
-
- if (activeDBLocationStr == null) {
- // For testing only
- return 1L;
- }
+ private long getSSTFileSummary(String filename)
+ throws RocksDBException, FileNotFoundException {
if (!filename.endsWith(SST_FILE_EXTENSION)) {
filename += SST_FILE_EXTENSION;
@@ -545,7 +538,8 @@ public class RocksDBCheckpointDiffer implements
AutoCloseable {
return properties.getNumEntries();
}
- private String getAbsoluteSstFilePath(String filename) {
+ private String getAbsoluteSstFilePath(String filename)
+ throws FileNotFoundException {
if (!filename.endsWith(SST_FILE_EXTENSION)) {
filename += SST_FILE_EXTENSION;
}
@@ -556,7 +550,7 @@ public class RocksDBCheckpointDiffer implements
AutoCloseable {
} else if (sstFileInActiveDB.exists()) {
return activeDBLocationStr + filename;
} else {
- throw new RuntimeException("Can't find SST file: " + filename);
+ throw new FileNotFoundException("Can't find SST file: " + filename);
}
}
@@ -986,6 +980,8 @@ public class RocksDBCheckpointDiffer implements
AutoCloseable {
numKeys = getSSTFileSummary(file);
} catch (RocksDBException e) {
LOG.warn("Can't get num of keys in SST '{}': {}", file, e.getMessage());
+ } catch (FileNotFoundException e) {
+ LOG.info("Can't find SST '{}'", file, e);
}
CompactionNode fileNode = new CompactionNode(
file, snapshotID, numKeys, seqNum);
diff --git
a/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java
b/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java
index f634222c63..c1ff195c77 100644
---
a/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java
+++
b/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java
@@ -32,9 +32,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
-import java.util.LinkedList;
import java.util.List;
-import java.util.Queue;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
@@ -46,6 +44,7 @@ import com.google.common.graph.MutableGraph;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer.NodeComparator;
import org.apache.ozone.test.GenericTestUtils;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -84,10 +83,6 @@ public class TestRocksDBCheckpointDiffer {
private static final Logger LOG =
LoggerFactory.getLogger(TestRocksDBCheckpointDiffer.class);
- /**
- * RocksDB path for the test.
- */
- private static final String TEST_DB_PATH = "./rocksdb-data";
private static final int NUM_ROW = 250000;
private static final int SNAPSHOT_EVERY_SO_MANY_KEYS = 49999;
@@ -97,12 +92,53 @@ public class TestRocksDBCheckpointDiffer {
private static final String CP_PATH_PREFIX = "rocksdb-cp-";
private final List<DifferSnapshotInfo> snapshots = new ArrayList<>();
+ private final String activeDbDirName = "./rocksdb-data";
+ private final String metadataDirName = "./metadata";
+ private final String compactionLogDirName = "compaction-log";
+ private final String sstBackUpDirName = "compaction-sst-backup";
+ private File activeDbDir;
+ private File metadataDirDir;
+ private File compactionLogDir;
+ private File sstBackUpDir;
+
@BeforeEach
public void init() {
// Checkpoint differ log level. Set to DEBUG for verbose output
GenericTestUtils.setLogLevel(RocksDBCheckpointDiffer.getLog(), Level.INFO);
// Test class log level. Set to DEBUG for verbose output
GenericTestUtils.setLogLevel(TestRocksDBCheckpointDiffer.LOG, Level.INFO);
+
+ activeDbDir = new File(activeDbDirName);
+ createDir(activeDbDir, activeDbDirName);
+
+ metadataDirDir = new File(metadataDirName);
+ createDir(metadataDirDir, metadataDirName);
+
+ compactionLogDir = new File(metadataDirName, compactionLogDirName);
+ createDir(compactionLogDir, metadataDirName + "/" + compactionLogDirName);
+
+ sstBackUpDir = new File(metadataDirName, sstBackUpDirName);
+ createDir(sstBackUpDir, metadataDirName + "/" + sstBackUpDirName);
+ }
+
+ private void createDir(File file, String filePath) {
+ // Remove already existed dir.
+ if (file.exists()) {
+ deleteDirectory(file);
+ }
+
+ // Create new Dir.
+ if (!file.mkdirs()) {
+ fail("Error in creating directory: " + filePath);
+ }
+ }
+
+ @AfterEach
+ public void cleanUp() {
+ deleteDirectory(compactionLogDir);
+ deleteDirectory(sstBackUpDir);
+ deleteDirectory(metadataDirDir);
+ deleteDirectory(activeDbDir);
}
/**
@@ -206,7 +242,12 @@ public class TestRocksDBCheckpointDiffer {
boolean expectingException) {
RocksDBCheckpointDiffer differ =
- new RocksDBCheckpointDiffer(null, null, null, 0L);
+ new RocksDBCheckpointDiffer(metadataDirName,
+ sstBackUpDirName,
+ compactionLogDirName,
+ activeDbDirName,
+ 0L,
+ 0L);
boolean exceptionThrown = false;
long createdTime = System.currentTimeMillis();
@@ -274,32 +315,17 @@ public class TestRocksDBCheckpointDiffer {
*/
@Test
void testDifferWithDB() throws Exception {
+ RocksDBCheckpointDiffer differ =
+ new RocksDBCheckpointDiffer(metadataDirName,
+ sstBackUpDirName,
+ compactionLogDirName,
+ activeDbDirName,
+ TimeUnit.DAYS.toMillis(1),
+ MINUTES.toMillis(5));
- final String clDirStr = "compaction-log";
- // Delete the compaction log dir for the test, if it exists
- File clDir = new File(clDirStr);
- if (clDir.exists()) {
- deleteDirectory(clDir);
- }
-
- final String metadataDirStr = ".";
- final String sstDirStr = "compaction-sst-backup";
-
- final File dbLocation = new File(TEST_DB_PATH);
- RocksDBCheckpointDiffer differ = new RocksDBCheckpointDiffer(
- metadataDirStr, sstDirStr, clDirStr, dbLocation,
- TimeUnit.DAYS.toMillis(1),
- MINUTES.toMillis(5));
-
- // Empty the SST backup folder first for testing
- File sstDir = new File(sstDirStr);
- deleteDirectory(sstDir);
- if (!sstDir.mkdir()) {
- fail("Unable to create SST backup directory");
- }
-
- RocksDB rocksDB = createRocksDBInstanceAndWriteKeys(TEST_DB_PATH, differ);
- readRocksDBInstance(TEST_DB_PATH, rocksDB, null, differ);
+ RocksDB rocksDB =
+ createRocksDBInstanceAndWriteKeys(activeDbDirName, differ);
+ readRocksDBInstance(activeDbDirName, rocksDB, null, differ);
if (LOG.isDebugEnabled()) {
printAllSnapshots();
@@ -312,7 +338,7 @@ public class TestRocksDBCheckpointDiffer {
diffAllSnapshots(differ);
// Confirm correct links created
- try (Stream<Path> sstPathStream = Files.list(sstDir.toPath())) {
+ try (Stream<Path> sstPathStream = Files.list(sstBackUpDir.toPath())) {
List<String> expectedLinks = sstPathStream.map(Path::getFileName)
.map(Object::toString).sorted().collect(Collectors.toList());
Assertions.assertEquals(expectedLinks, asList(
@@ -379,7 +405,7 @@ public class TestRocksDBCheckpointDiffer {
final long dbLatestSequenceNumber = rocksDB.getLatestSequenceNumber();
- createCheckPoint(TEST_DB_PATH, cpPath, rocksDB);
+ createCheckPoint(activeDbDirName, cpPath, rocksDB);
final String snapshotId = "snap_id_" + snapshotGeneration;
final DifferSnapshotInfo currentSnapshot =
new DifferSnapshotInfo(cpPath, snapshotId, snapshotGeneration, null);
@@ -632,45 +658,6 @@ public class TestRocksDBCheckpointDiffer {
}
}
- private void printMutableGraph(String srcSnapId, String destSnapId,
- MutableGraph<CompactionNode> mutableGraph) {
-
- LOG.debug("Gathering all SST file nodes from src '{}' to dest '{}'",
- srcSnapId, destSnapId);
-
- final Queue<CompactionNode> nodeQueue = new LinkedList<>();
- // Queue source snapshot SST file nodes
- for (CompactionNode node : mutableGraph.nodes()) {
- if (srcSnapId == null ||
- node.getSnapshotId().compareToIgnoreCase(srcSnapId) == 0) {
- nodeQueue.add(node);
- }
- }
-
- final Set<CompactionNode> allNodesSet = new HashSet<>();
- while (!nodeQueue.isEmpty()) {
- CompactionNode node = nodeQueue.poll();
- Set<CompactionNode> succSet = mutableGraph.successors(node);
- LOG.debug("Current node: {}", node);
- if (succSet.isEmpty()) {
- LOG.debug("Has no successor node");
- allNodesSet.add(node);
- continue;
- }
- for (CompactionNode succNode : succSet) {
- LOG.debug("Has successor node: {}", succNode);
- if (srcSnapId == null ||
- succNode.getSnapshotId().compareToIgnoreCase(destSnapId) == 0) {
- allNodesSet.add(succNode);
- continue;
- }
- nodeQueue.add(succNode);
- }
- }
-
- LOG.debug("Files are: {}", allNodesSet);
- }
-
private static final List<List<String>> SST_FILES_BY_LEVEL = Arrays.asList(
Arrays.asList("000015", "000013", "000011", "000009"),
Arrays.asList("000018", "000016", "000017", "000026", "000024", "000022",
@@ -830,7 +817,12 @@ public class TestRocksDBCheckpointDiffer {
Set<String> expectedFileNodesRemoved) {
RocksDBCheckpointDiffer differ =
- new RocksDBCheckpointDiffer(null, null, null, 0L);
+ new RocksDBCheckpointDiffer(metadataDirName,
+ sstBackUpDirName,
+ compactionLogDirName,
+ activeDbDirName,
+ 0L,
+ 0L);
Set<String> actualFileNodesRemoved =
differ.pruneBackwardDag(originalDag, levelToBeRemoved);
Assertions.assertEquals(expectedDag, originalDag);
@@ -888,7 +880,12 @@ public class TestRocksDBCheckpointDiffer {
Set<String> expectedFileNodesRemoved) {
RocksDBCheckpointDiffer differ =
- new RocksDBCheckpointDiffer(null, null, null, 0L);
+ new RocksDBCheckpointDiffer(metadataDirName,
+ sstBackUpDirName,
+ compactionLogDirName,
+ activeDbDirName,
+ 0L,
+ 0L);
Set<String> actualFileNodesRemoved =
differ.pruneForwardDag(originalDag, levelToBeRemoved);
Assertions.assertEquals(expectedDag, originalDag);
@@ -1053,23 +1050,11 @@ public class TestRocksDBCheckpointDiffer {
Set<String> expectedNodes,
int expectedNumberOfLogFilesDeleted
) throws IOException {
- String compactionLogDirName = "./test-compaction-log";
- File compactionLogDir = new File(compactionLogDirName);
- if (!compactionLogDir.exists() && !compactionLogDir.mkdirs()) {
- fail("Error creating compaction log directory: " + compactionLogDirName);
- }
-
- String sstBackUpDirName = "./test-compaction-sst-backup";
- File sstBackUpDir = new File(sstBackUpDirName);
- if (!sstBackUpDir.exists() && !sstBackUpDir.mkdirs()) {
- fail("Error creating SST backup directory: " + sstBackUpDirName);
- }
-
List<File> filesCreated = new ArrayList<>();
for (int i = 0; i < compactionLogs.size(); i++) {
- String compactionFileName =
- compactionLogDirName + "/0000" + i + COMPACTION_LOG_FILE_NAME_SUFFIX;
+ String compactionFileName = metadataDirName + "/" + compactionLogDirName
+ + "/0000" + i + COMPACTION_LOG_FILE_NAME_SUFFIX;
File compactionFile = new File(compactionFileName);
Files.write(compactionFile.toPath(),
compactionLogs.get(i).getBytes(UTF_8));
@@ -1077,10 +1062,12 @@ public class TestRocksDBCheckpointDiffer {
}
RocksDBCheckpointDiffer differ =
- new RocksDBCheckpointDiffer(sstBackUpDirName,
+ new RocksDBCheckpointDiffer(metadataDirName,
+ sstBackUpDirName,
compactionLogDirName,
- null,
- MINUTES.toMillis(10));
+ activeDbDirName,
+ MINUTES.toMillis(10),
+ 0L);
differ.loadAllCompactionLogs();
@@ -1111,9 +1098,6 @@ public class TestRocksDBCheckpointDiffer {
File compactionFile = filesCreated.get(i);
assertTrue(compactionFile.exists());
}
-
- deleteDirectory(compactionLogDir);
- deleteDirectory(sstBackUpDir);
}
private static Stream<Arguments> sstFilePruningScenarios() {
@@ -1164,21 +1148,8 @@ public class TestRocksDBCheckpointDiffer {
List<String> initialFiles,
List<String> expectedFiles
) throws IOException {
-
- String sstBackUpDirName = "./test-compaction-sst-backup";
- File sstBackUpDir = new File(sstBackUpDirName);
- if (!sstBackUpDir.exists() && !sstBackUpDir.mkdirs()) {
- fail("Error creating SST backup directory: " + sstBackUpDirName);
- }
-
- String compactionLogDirName = "./test-compaction-log";
- File compactionLogDir = new File(compactionLogDirName);
- if (!compactionLogDir.exists() && !compactionLogDir.mkdirs()) {
- fail("Error creating compaction log directory: " + compactionLogDirName);
- }
-
- createFileWithContext(compactionLogDirName + "/compaction_log" +
- COMPACTION_LOG_FILE_NAME_SUFFIX,
+ createFileWithContext(metadataDirName + "/" + compactionLogDirName
+ + "/compaction_log" + COMPACTION_LOG_FILE_NAME_SUFFIX,
compactionLog);
for (String fileName : initialFiles) {
@@ -1187,16 +1158,19 @@ public class TestRocksDBCheckpointDiffer {
}
RocksDBCheckpointDiffer differ =
- new RocksDBCheckpointDiffer(sstBackUpDirName,
+ new RocksDBCheckpointDiffer(metadataDirName,
+ sstBackUpDirName,
compactionLogDirName,
- null,
- MINUTES.toMillis(10));
+ activeDbDirName,
+ MINUTES.toMillis(10),
+ 0L);
differ.loadAllCompactionLogs();
differ.pruneSstFiles();
Set<String> actualFileSetAfterPruning;
- try (Stream<Path> pathStream = Files.list(Paths.get(sstBackUpDirName))
+ try (Stream<Path> pathStream = Files.list(
+ Paths.get(metadataDirName + "/" + sstBackUpDirName))
.filter(e -> e.toString().toLowerCase()
.endsWith(SST_FILE_EXTENSION))
.sorted()) {
@@ -1209,8 +1183,6 @@ public class TestRocksDBCheckpointDiffer {
Set<String> expectedFileSet = new HashSet<>(expectedFiles);
assertEquals(expectedFileSet, actualFileSetAfterPruning);
- deleteDirectory(compactionLogDir);
- deleteDirectory(sstBackUpDir);
}
private void createFileWithContext(String fileName, String context)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]