This is an automated email from the ASF dual-hosted git repository.
hemant pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 3d7e786a0c HDDS-8940. Fix for missing SST files on optimized snapDiff
code path (#5465)
3d7e786a0c is described below
commit 3d7e786a0cb0ec5a572a6f22eac7ef083b9eb180
Author: Hemant Kumar <[email protected]>
AuthorDate: Fri Oct 27 15:33:29 2023 -0700
HDDS-8940. Fix for missing SST files on optimized snapDiff code path (#5465)
---
.../apache/hadoop/hdds/utils/db/RocksDatabase.java | 74 +++--
.../ozone/compaction/log/CompactionFileInfo.java | 10 +-
.../ozone/compaction/log/CompactionLogEntry.java | 4 +-
.../org/apache/ozone/rocksdiff/CompactionNode.java | 21 +-
.../ozone/rocksdiff/RocksDBCheckpointDiffer.java | 132 ++++++---
.../org/apache/ozone/rocksdiff/RocksDiffUtils.java | 9 +-
.../rocksdiff/TestRocksDBCheckpointDiffer.java | 306 ++++++++++++++++++++-
.../apache/ozone/rocksdiff/TestRocksDiffUtils.java | 57 ++++
.../hadoop/ozone/freon/TestOMSnapshotDAG.java | 23 +-
.../apache/hadoop/ozone/om/OmSnapshotManager.java | 31 +--
.../hadoop/ozone/om/SstFilteringService.java | 64 +----
.../ozone/om/snapshot/SnapshotDiffManager.java | 32 +--
.../hadoop/ozone/om/snapshot/SnapshotUtils.java | 73 +++++
.../hadoop/ozone/om/TestSstFilteringService.java | 24 --
.../snapshot/TestOMSnapshotCreateResponse.java | 5 +-
15 files changed, 620 insertions(+), 245 deletions(-)
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java
index ea4fd0b921..5c0aeae5ed 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java
@@ -18,9 +18,7 @@
package org.apache.hadoop.hdds.utils.db;
import com.google.common.annotations.VisibleForTesting;
-import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.StringUtils;
-import org.apache.hadoop.hdds.utils.BooleanTriFunction;
import org.apache.hadoop.hdds.utils.HddsServerUtil;
import org.apache.hadoop.hdds.utils.db.managed.ManagedCheckpoint;
import org.apache.hadoop.hdds.utils.db.managed.ManagedColumnFamilyOptions;
@@ -36,6 +34,7 @@ import
org.apache.hadoop.hdds.utils.db.managed.ManagedRocksObjectUtils;
import org.apache.hadoop.hdds.utils.db.managed.ManagedTransactionLogIterator;
import org.apache.hadoop.hdds.utils.db.managed.ManagedWriteBatch;
import org.apache.hadoop.hdds.utils.db.managed.ManagedWriteOptions;
+import org.apache.ozone.rocksdiff.RocksDiffUtils;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.Holder;
@@ -49,7 +48,6 @@ import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
@@ -66,6 +64,7 @@ import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.hadoop.hdds.StringUtils.bytes2String;
import static
org.apache.hadoop.hdds.utils.db.managed.ManagedColumnFamilyOptions.closeDeeply;
import static
org.apache.hadoop.hdds.utils.db.managed.ManagedRocksIterator.managed;
@@ -616,7 +615,7 @@ public final class RocksDatabase implements Closeable {
assertClose();
for (ColumnFamilyHandle cf : getCfHandleMap().get(db.get().getName())) {
try {
- String table = new String(cf.getName(), StandardCharsets.UTF_8);
+ String table = new String(cf.getName(), UTF_8);
if (cfName.equals(table)) {
return cf;
}
@@ -955,46 +954,45 @@ public final class RocksDatabase implements Closeable {
/**
* Deletes sst files which do not correspond to prefix
* for given table.
- * @param prefixPairs, a list of pair (TableName,prefixUsed).
+ * @param prefixPairs, a map of TableName to prefixUsed.
*/
- public void deleteFilesNotMatchingPrefix(
- List<Pair<String, String>> prefixPairs,
- BooleanTriFunction<String, String, String, Boolean> filterFunction)
+ public void deleteFilesNotMatchingPrefix(Map<String, String> prefixPairs)
throws IOException, RocksDBException {
assertClose();
for (LiveFileMetaData liveFileMetaData : getSstFileList()) {
String sstFileColumnFamily =
- new String(liveFileMetaData.columnFamilyName(),
- StandardCharsets.UTF_8);
+ new String(liveFileMetaData.columnFamilyName(), UTF_8);
int lastLevel = getLastLevel();
- for (Pair<String, String> prefixPair : prefixPairs) {
- String columnFamily = prefixPair.getKey();
- String prefixForColumnFamily = prefixPair.getValue();
- if (!sstFileColumnFamily.equals(columnFamily)) {
- continue;
- }
- // RocksDB #deleteFile API allows only to delete the last level of
- // SST Files. Any level < last level won't get deleted and
- // only last file of level 0 can be deleted
- // and will throw warning in the rocksdb manifest.
- // Instead, perform the level check here
- // itself to avoid failed delete attempts for lower level files.
- if (liveFileMetaData.level() != lastLevel || lastLevel == 0) {
- continue;
- }
- String firstDbKey =
- new String(liveFileMetaData.smallestKey(), StandardCharsets.UTF_8);
- String lastDbKey =
- new String(liveFileMetaData.largestKey(), StandardCharsets.UTF_8);
- boolean isKeyWithPrefixPresent =
- filterFunction.apply(firstDbKey, lastDbKey, prefixForColumnFamily);
- if (!isKeyWithPrefixPresent) {
- LOG.info("Deleting sst file {} corresponding to column family"
- + " {} from db: {}", liveFileMetaData.fileName(),
- StringUtils.bytes2String(liveFileMetaData.columnFamilyName()),
- db.get().getName());
- db.deleteFile(liveFileMetaData);
- }
+
+ if (!prefixPairs.containsKey(sstFileColumnFamily)) {
+ continue;
+ }
+
+ // RocksDB #deleteFile API allows only to delete the last level of
+ // SST Files. Any level < last level won't get deleted and
+ // only last file of level 0 can be deleted
+ // and will throw warning in the rocksdb manifest.
+ // Instead, perform the level check here
+ // itself to avoid failed delete attempts for lower level files.
+ if (liveFileMetaData.level() != lastLevel || lastLevel == 0) {
+ continue;
+ }
+
+ String prefixForColumnFamily = prefixPairs.get(sstFileColumnFamily);
+ String firstDbKey = new String(liveFileMetaData.smallestKey(), UTF_8);
+ String lastDbKey = new String(liveFileMetaData.largestKey(), UTF_8);
+ boolean isKeyWithPrefixPresent = RocksDiffUtils.isKeyWithPrefixPresent(
+ prefixForColumnFamily, firstDbKey, lastDbKey);
+ if (!isKeyWithPrefixPresent) {
+ LOG.info("Deleting sst file: {} with start key: {} and end key: {} " +
+ "corresponding to column family {} from db: {}. " +
+ "Prefix for the column family: {}.",
+ liveFileMetaData.fileName(),
+ firstDbKey, lastDbKey,
+ StringUtils.bytes2String(liveFileMetaData.columnFamilyName()),
+ db.get().getName(),
+ prefixForColumnFamily);
+ db.deleteFile(liveFileMetaData);
}
}
}
diff --git
a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/compaction/log/CompactionFileInfo.java
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/compaction/log/CompactionFileInfo.java
index 68a9363b05..5a633bcc6d 100644
---
a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/compaction/log/CompactionFileInfo.java
+++
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/compaction/log/CompactionFileInfo.java
@@ -18,6 +18,7 @@
package org.apache.ozone.compaction.log;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.util.Preconditions;
@@ -32,10 +33,11 @@ public final class CompactionFileInfo {
private final String endKey;
private final String columnFamily;
- private CompactionFileInfo(String fileName,
- String startRange,
- String endRange,
- String columnFamily) {
+ @VisibleForTesting
+ public CompactionFileInfo(String fileName,
+ String startRange,
+ String endRange,
+ String columnFamily) {
this.fileName = fileName;
this.startKey = startRange;
this.endKey = endRange;
diff --git
a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/compaction/log/CompactionLogEntry.java
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/compaction/log/CompactionLogEntry.java
index 41a003515d..557aa9e399 100644
---
a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/compaction/log/CompactionLogEntry.java
+++
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/compaction/log/CompactionLogEntry.java
@@ -18,6 +18,7 @@
package org.apache.ozone.compaction.log;
+import com.google.common.annotations.VisibleForTesting;
import
org.apache.hadoop.hdds.protocol.proto.HddsProtos.CompactionLogEntryProto;
import org.apache.hadoop.hdds.utils.db.Codec;
import org.apache.hadoop.hdds.utils.db.CopyObject;
@@ -49,7 +50,8 @@ public final class CompactionLogEntry implements
private final List<CompactionFileInfo> outputFileInfoList;
private final String compactionReason;
- private CompactionLogEntry(long dbSequenceNumber,
+ @VisibleForTesting
+ public CompactionLogEntry(long dbSequenceNumber,
long compactionTime,
List<CompactionFileInfo> inputFileInfoList,
List<CompactionFileInfo> outputFileInfoList,
diff --git
a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/CompactionNode.java
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/CompactionNode.java
index a7cfa27aaf..6a2767bf40 100644
---
a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/CompactionNode.java
+++
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/CompactionNode.java
@@ -28,6 +28,9 @@ public class CompactionNode {
private final long snapshotGeneration;
private final long totalNumberOfKeys;
private long cumulativeKeysReverseTraversal;
+ private final String startKey;
+ private final String endKey;
+ private final String columnFamily;
/**
* CompactionNode constructor.
@@ -36,12 +39,16 @@ public class CompactionNode {
* @param numKeys Number of keys in the SST
* @param seqNum Snapshot generation (sequence number)
*/
- public CompactionNode(String file, String ssId, long numKeys, long seqNum) {
+ public CompactionNode(String file, String ssId, long numKeys, long seqNum,
+ String startKey, String endKey, String columnFamily) {
fileName = file;
snapshotId = ssId;
totalNumberOfKeys = numKeys;
snapshotGeneration = seqNum;
cumulativeKeysReverseTraversal = 0L;
+ this.startKey = startKey;
+ this.endKey = endKey;
+ this.columnFamily = columnFamily;
}
@Override
@@ -69,6 +76,18 @@ public class CompactionNode {
return cumulativeKeysReverseTraversal;
}
+ public String getStartKey() {
+ return startKey;
+ }
+
+ public String getEndKey() {
+ return endKey;
+ }
+
+ public String getColumnFamily() {
+ return columnFamily;
+ }
+
public void setCumulativeKeysReverseTraversal(
long cumulativeKeysReverseTraversal) {
this.cumulativeKeysReverseTraversal = cumulativeKeysReverseTraversal;
diff --git
a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
index ebf4473a54..4feb1a8f2a 100644
---
a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
+++
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
@@ -33,6 +33,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.StringUtils;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
@@ -746,7 +747,6 @@ public class RocksDBCheckpointDiffer implements
AutoCloseable,
}
}
-
/**
* Helper to read compaction log file to the internal DAG and compaction log
* table.
@@ -896,22 +896,23 @@ public class RocksDBCheckpointDiffer implements
AutoCloseable,
* @param dest destination snapshot
* @return A list of SST files without extension. e.g. ["000050", "000060"]
*/
- public synchronized List<String> getSSTDiffList(DifferSnapshotInfo src,
- DifferSnapshotInfo dest)
- throws IOException {
+ public synchronized List<String> getSSTDiffList(
+ DifferSnapshotInfo src,
+ DifferSnapshotInfo dest
+ ) throws IOException {
// TODO: Reject or swap if dest is taken after src, once snapshot chain
// integration is done.
- HashSet<String> srcSnapFiles = readRocksDBLiveFiles(src.getRocksDB());
- HashSet<String> destSnapFiles = readRocksDBLiveFiles(dest.getRocksDB());
+ Set<String> srcSnapFiles = readRocksDBLiveFiles(src.getRocksDB());
+ Set<String> destSnapFiles = readRocksDBLiveFiles(dest.getRocksDB());
- HashSet<String> fwdDAGSameFiles = new HashSet<>();
- HashSet<String> fwdDAGDifferentFiles = new HashSet<>();
+ Set<String> fwdDAGSameFiles = new HashSet<>();
+ Set<String> fwdDAGDifferentFiles = new HashSet<>();
LOG.debug("Doing forward diff from src '{}' to dest '{}'",
src.getDbPath(), dest.getDbPath());
internalGetSSTDiffList(src, dest, srcSnapFiles, destSnapFiles,
- forwardCompactionDAG, fwdDAGSameFiles, fwdDAGDifferentFiles);
+ fwdDAGSameFiles, fwdDAGDifferentFiles);
if (LOG.isDebugEnabled()) {
LOG.debug("Result of diff from src '" + src.getDbPath() + "' to dest '" +
@@ -967,15 +968,20 @@ public class RocksDBCheckpointDiffer implements
AutoCloseable,
* need further diffing.
*/
synchronized void internalGetSSTDiffList(
- DifferSnapshotInfo src, DifferSnapshotInfo dest,
- Set<String> srcSnapFiles, Set<String> destSnapFiles,
- MutableGraph<CompactionNode> mutableGraph,
- Set<String> sameFiles, Set<String> differentFiles) {
+ DifferSnapshotInfo src,
+ DifferSnapshotInfo dest,
+ Set<String> srcSnapFiles,
+ Set<String> destSnapFiles,
+ Set<String> sameFiles,
+ Set<String> differentFiles) {
- // Sanity check
Preconditions.checkArgument(sameFiles.isEmpty(), "Set must be empty");
Preconditions.checkArgument(differentFiles.isEmpty(), "Set must be empty");
+ // Use source snapshot's table prefix. At this point Source and target's
+ // table prefix should be same.
+ Map<String, String> columnFamilyToPrefixMap = src.getTablePrefixes();
+
for (String fileName : srcSnapFiles) {
if (destSnapFiles.contains(fileName)) {
LOG.debug("Source '{}' and destination '{}' share the same SST '{}'",
@@ -1017,7 +1023,7 @@ public class RocksDBCheckpointDiffer implements
AutoCloseable,
final Set<CompactionNode> nextLevel = new HashSet<>();
for (CompactionNode current : currentLevel) {
- LOG.debug("Processing node: {}", current.getFileName());
+ LOG.debug("Processing node: '{}'", current.getFileName());
if (current.getSnapshotGeneration() < dest.getSnapshotGeneration()) {
LOG.debug("Current node's snapshot generation '{}' "
+ "reached destination snapshot's '{}'. "
@@ -1028,7 +1034,8 @@ public class RocksDBCheckpointDiffer implements
AutoCloseable,
continue;
}
- Set<CompactionNode> successors = mutableGraph.successors(current);
+ Set<CompactionNode> successors =
+ forwardCompactionDAG.successors(current);
if (successors.isEmpty()) {
LOG.debug("No further compaction happened to the current file. " +
"Src '{}' and dest '{}' have different file: {}",
@@ -1037,24 +1044,34 @@ public class RocksDBCheckpointDiffer implements
AutoCloseable,
continue;
}
- for (CompactionNode node : successors) {
- if (sameFiles.contains(node.getFileName()) ||
- differentFiles.contains(node.getFileName())) {
- LOG.debug("Skipping known processed SST: {}",
node.getFileName());
+ for (CompactionNode nextNode : successors) {
+ if (shouldSkipNode(nextNode, columnFamilyToPrefixMap)) {
+ LOG.debug("Skipping next node: '{}' with startKey: '{}' and " +
+ "endKey: '{}' because it doesn't have keys related to " +
+ "columnFamilyToPrefixMap: '{}'.",
+ nextNode.getFileName(), nextNode.getStartKey(),
+ nextNode.getEndKey(), columnFamilyToPrefixMap);
continue;
}
- if (destSnapFiles.contains(node.getFileName())) {
+ if (sameFiles.contains(nextNode.getFileName()) ||
+ differentFiles.contains(nextNode.getFileName())) {
+ LOG.debug("Skipping known processed SST: {}",
+ nextNode.getFileName());
+ continue;
+ }
+
+ if (destSnapFiles.contains(nextNode.getFileName())) {
LOG.debug("Src '{}' and dest '{}' have the same SST: {}",
- src.getDbPath(), dest.getDbPath(), node.getFileName());
- sameFiles.add(node.getFileName());
+ src.getDbPath(), dest.getDbPath(), nextNode.getFileName());
+ sameFiles.add(nextNode.getFileName());
continue;
}
// Queue different SST to the next level
LOG.debug("Src '{}' and dest '{}' have a different SST: {}",
- src.getDbPath(), dest.getDbPath(), node.getFileName());
- nextLevel.add(node);
+ src.getDbPath(), dest.getDbPath(), nextNode.getFileName());
+ nextLevel.add(nextNode);
}
}
currentLevel = nextLevel;
@@ -1105,7 +1122,8 @@ public class RocksDBCheckpointDiffer implements
AutoCloseable,
* @return CompactionNode
*/
private CompactionNode addNodeToDAG(String file, String snapshotID,
- long seqNum) {
+ long seqNum, String startKey,
+ String endKey, String columnFamily) {
long numKeys = 0L;
try {
numKeys = getSSTFileSummary(file);
@@ -1114,8 +1132,8 @@ public class RocksDBCheckpointDiffer implements
AutoCloseable,
} catch (FileNotFoundException e) {
LOG.info("Can't find SST '{}'", file);
}
- CompactionNode fileNode = new CompactionNode(
- file, snapshotID, numKeys, seqNum);
+ CompactionNode fileNode = new CompactionNode(file, snapshotID, numKeys,
+ seqNum, startKey, endKey, columnFamily);
forwardCompactionDAG.addNode(fileNode);
backwardCompactionDAG.addNode(fileNode);
@@ -1142,12 +1160,14 @@ public class RocksDBCheckpointDiffer implements
AutoCloseable,
for (CompactionFileInfo outfile : outputFiles) {
final CompactionNode outfileNode = compactionNodeMap.computeIfAbsent(
outfile.getFileName(),
- file -> addNodeToDAG(file, snapshotId, seqNum));
+ file -> addNodeToDAG(file, snapshotId, seqNum, outfile.getStartKey(),
+ outfile.getEndKey(), outfile.getColumnFamily()));
for (CompactionFileInfo infile : inputFiles) {
final CompactionNode infileNode = compactionNodeMap.computeIfAbsent(
infile.getFileName(),
- file -> addNodeToDAG(file, snapshotId, seqNum));
+ file -> addNodeToDAG(file, snapshotId, seqNum,
infile.getStartKey(),
+ infile.getEndKey(), infile.getColumnFamily()));
// Draw the edges
if (!outfileNode.getFileName().equals(infileNode.getFileName())) {
forwardCompactionDAG.putEdge(outfileNode, infileNode);
@@ -1526,6 +1546,8 @@ public class RocksDBCheckpointDiffer implements
AutoCloseable,
for (String sstFile : sstFiles) {
String fileName = sstFile.substring(fileNameOffset,
sstFile.length() - SST_FILE_EXTENSION_LENGTH);
+ CompactionFileInfo.Builder fileInfoBuilder =
+ new CompactionFileInfo.Builder(fileName);
SstFileReader fileReader = new SstFileReader(options);
try {
fileReader.open(sstFile);
@@ -1536,18 +1558,52 @@ public class RocksDBCheckpointDiffer implements
AutoCloseable,
String startKey = StringUtils.bytes2String(iterator.key());
iterator.seekToLast();
String endKey = StringUtils.bytes2String(iterator.key());
-
- CompactionFileInfo fileInfo = new CompactionFileInfo.Builder(fileName)
- .setStartRange(startKey)
+ fileInfoBuilder.setStartRange(startKey)
.setEndRange(endKey)
- .setColumnFamily(columnFamily)
- .build();
- response.add(fileInfo);
+ .setColumnFamily(columnFamily);
} catch (RocksDBException rocksDBException) {
- throw new RuntimeException("Failed to read SST file: " + sstFile,
- rocksDBException);
+ // Ideally it should not happen. If it does just log the exception.
+ // And let the compaction complete without the exception.
+ // Throwing exception in compaction listener could fail the RocksDB.
+ // In case of exception, compaction node will be missing start key,
+ // end key and column family. And it will continue the traversal as
+ // it was before HDDS-8940.
+ LOG.warn("Failed to read SST file: {}.", sstFile, rocksDBException);
}
+ response.add(fileInfoBuilder.build());
}
return response;
}
+
+ @VisibleForTesting
+ boolean shouldSkipNode(CompactionNode node,
+ Map<String, String> columnFamilyToPrefixMap) {
+ // This is for backward compatibility. Before the compaction log table
+ // migration, startKey, endKey and columnFamily information is not
persisted
+ // in compaction log files.
+ // Also for the scenario when there is an exception in reading SST files
+ // for the file node.
+ if (node.getStartKey() == null || node.getEndKey() == null ||
+ node.getColumnFamily() == null) {
+ LOG.debug("Compaction node with fileName: {} doesn't have startKey, " +
+ "endKey and columnFamily details.", node.getFileName());
+ return false;
+ }
+
+ if (MapUtils.isEmpty(columnFamilyToPrefixMap)) {
+ LOG.debug("Provided columnFamilyToPrefixMap is null or empty.");
+ return false;
+ }
+
+ if (!columnFamilyToPrefixMap.containsKey(node.getColumnFamily())) {
+ LOG.debug("SstFile node: {} is for columnFamily: {} while filter map " +
+ "contains columnFamilies: {}.", node.getFileName(),
+ node.getColumnFamily(), columnFamilyToPrefixMap.keySet());
+ return true;
+ }
+
+ String keyPrefix = columnFamilyToPrefixMap.get(node.getColumnFamily());
+ return !RocksDiffUtils.isKeyWithPrefixPresent(keyPrefix,
node.getStartKey(),
+ node.getEndKey());
+ }
}
diff --git
a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDiffUtils.java
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDiffUtils.java
index d1acaf0c86..4b7da351f1 100644
---
a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDiffUtils.java
+++
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDiffUtils.java
@@ -48,9 +48,12 @@ public final class RocksDiffUtils {
}
public static boolean isKeyWithPrefixPresent(String prefixForColumnFamily,
- String firstDbKey, String lastDbKey) {
- return firstDbKey.compareTo(prefixForColumnFamily) <= 0
- && prefixForColumnFamily.compareTo(lastDbKey) <= 0;
+ String firstDbKey,
+ String lastDbKey) {
+ String firstKeyPrefix = constructBucketKey(firstDbKey);
+ String endKeyPrefix = constructBucketKey(lastDbKey);
+ return firstKeyPrefix.compareTo(prefixForColumnFamily) <= 0
+ && prefixForColumnFamily.compareTo(endKeyPrefix) <= 0;
}
public static String constructBucketKey(String keyName) {
diff --git
a/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java
b/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java
index 18118b142e..13a4cfbdf5 100644
---
a/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java
+++
b/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java
@@ -33,8 +33,11 @@ import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
@@ -471,7 +474,6 @@ public class TestRocksDBCheckpointDiffer {
destSnapshot,
srcSnapshotSstFiles,
destSnapshotSstFiles,
- rocksDBCheckpointDiffer.getForwardCompactionDAG(),
actualSameSstFiles,
actualDiffSstFiles);
} catch (RuntimeException rtEx) {
@@ -850,7 +852,8 @@ public class TestRocksDBCheckpointDiffer {
sstFile -> new CompactionNode(sstFile,
UUID.randomUUID().toString(),
1000L,
- Long.parseLong(sstFile.substring(0, 6))
+ Long.parseLong(sstFile.substring(0, 6)),
+ null, null, null
))
.collect(Collectors.toList()))
.collect(Collectors.toList());
@@ -1518,4 +1521,303 @@ public class TestRocksDBCheckpointDiffer {
fileOutputStream.write(context.getBytes(UTF_8));
}
}
+
+ private final List<CompactionLogEntry> compactionLogEntryList =
Arrays.asList(
+ new CompactionLogEntry(101, System.currentTimeMillis(),
+ Arrays.asList(
+ new CompactionFileInfo("000068", "/volume/bucket2",
+ "/volume/bucket2", "bucketTable"),
+ new CompactionFileInfo("000057", "/volume/bucket1",
+ "/volume/bucket1", "bucketTable")),
+ Collections.singletonList(
+ new CompactionFileInfo("000086", "/volume/bucket1",
+ "/volume/bucket2", "bucketTable")),
+ null),
+ new CompactionLogEntry(178, System.currentTimeMillis(),
+ Arrays.asList(new CompactionFileInfo("000078",
+ "/volume/bucket1/key-0000001411",
+ "/volume/bucket2/key-0000099649",
+ "keyTable"),
+ new CompactionFileInfo("000075",
+ "/volume/bucket1/key-0000016536",
+ "/volume/bucket2/key-0000098897",
+ "keyTable"),
+ new CompactionFileInfo("000073",
+ "/volume/bucket1/key-0000000730",
+ "/volume/bucket2/key-0000097010",
+ "keyTable"),
+ new CompactionFileInfo("000071",
+ "/volume/bucket1/key-0000001820",
+ "/volume/bucket2/key-0000097895",
+ "keyTable"),
+ new CompactionFileInfo("000063",
+ "/volume/bucket1/key-0000001016",
+ "/volume/bucket1/key-0000099930",
+ "keyTable")),
+ Collections.singletonList(new CompactionFileInfo("000081",
+ "/volume/bucket1/key-0000000730",
+ "/volume/bucket2/key-0000099649",
+ "keyTable")),
+ null
+ ),
+ new CompactionLogEntry(233, System.currentTimeMillis(),
+ Arrays.asList(
+ new CompactionFileInfo("000086", "/volume/bucket1",
+ "/volume/bucket2", "bucketTable"),
+ new CompactionFileInfo("000088", "/volume/bucket3",
+ "/volume/bucket3", "bucketTable")),
+ Collections.singletonList(
+ new CompactionFileInfo("000110", "/volume/bucket1",
+ "/volume/bucket3", "bucketTable")
+ ),
+ null),
+ new CompactionLogEntry(256, System.currentTimeMillis(),
+ Arrays.asList(new CompactionFileInfo("000081",
+ "/volume/bucket1/key-0000000730",
+ "/volume/bucket2/key-0000099649",
+ "keyTable"),
+ new CompactionFileInfo("000103",
+ "/volume/bucket1/key-0000017460",
+ "/volume/bucket3/key-0000097450",
+ "keyTable"),
+ new CompactionFileInfo("000099",
+ "/volume/bucket1/key-0000002310",
+ "/volume/bucket3/key-0000098286",
+ "keyTable"),
+ new CompactionFileInfo("000097",
+ "/volume/bucket1/key-0000005965",
+ "/volume/bucket3/key-0000099136",
+ "keyTable"),
+ new CompactionFileInfo("000095",
+ "/volume/bucket1/key-0000012424",
+ "/volume/bucket3/key-0000083904",
+ "keyTable")),
+ Collections.singletonList(new CompactionFileInfo("000106",
+ "/volume/bucket1/key-0000000730",
+ "/volume/bucket3/key-0000099136",
+ "keyTable")),
+ null),
+ new CompactionLogEntry(397, now(),
+ Arrays.asList(new CompactionFileInfo("000106",
+ "/volume/bucket1/key-0000000730",
+ "/volume/bucket3/key-0000099136",
+ "keyTable"),
+ new CompactionFileInfo("000128",
+ "/volume/bucket2/key-0000005031",
+ "/volume/bucket3/key-0000084385",
+ "keyTable"),
+ new CompactionFileInfo("000125",
+ "/volume/bucket2/key-0000003491",
+ "/volume/bucket3/key-0000088414",
+ "keyTable"),
+ new CompactionFileInfo("000123",
+ "/volume/bucket2/key-0000007390",
+ "/volume/bucket3/key-0000094627",
+ "keyTable"),
+ new CompactionFileInfo("000121",
+ "/volume/bucket2/key-0000003232",
+ "/volume/bucket3/key-0000094246",
+ "keyTable")),
+ Collections.singletonList(new CompactionFileInfo("000131",
+ "/volume/bucket1/key-0000000730",
+ "/volume/bucket3/key-0000099136",
+ "keyTable")),
+ null
+ )
+ );
+
+ private static Map<String, String> columnFamilyToPrefixMap1 =
+ new HashMap<String, String>() {
+ {
+ put("keyTable", "/volume/bucket1/");
+ // Simply using bucketName instead of ID for the test.
+ put("directoryTable", "/volume/bucket1/");
+ put("fileTable", "/volume/bucket1/");
+ }
+ };
+
+ private static Map<String, String> columnFamilyToPrefixMap2 =
+ new HashMap<String, String>() {
+ {
+ put("keyTable", "/volume/bucket2/");
+ // Simply using bucketName instead of ID for the test.
+ put("directoryTable", "/volume/bucket2/");
+ put("fileTable", "/volume/bucket2/");
+ }
+ };
+
+ private static Map<String, String> columnFamilyToPrefixMap3 =
+ new HashMap<String, String>() {
+ {
+ put("keyTable", "/volume/bucket3/");
+ // Simply using bucketName instead of ID for the test.
+ put("directoryTable", "/volume/bucket3/");
+ put("fileTable", "/volume/bucket3/");
+ }
+ };
+
+ /**
+ * Test cases for testGetSSTDiffListWithoutDB.
+ */
+ private static Stream<Arguments> casesGetSSTDiffListWithoutDB2() {
+ return Stream.of(
+ Arguments.of("Test case 1.",
+ ImmutableSet.of("000081"),
+ ImmutableSet.of("000063"),
+ ImmutableSet.of("000063"),
+ ImmutableSet.of("000078", "000071", "000075", "000073"),
+ columnFamilyToPrefixMap1),
+ Arguments.of("Test case 2.",
+ ImmutableSet.of("000106"),
+ ImmutableSet.of("000081"),
+ ImmutableSet.of("000081"),
+ ImmutableSet.of("000099", "000103", "000097", "000095"),
+ columnFamilyToPrefixMap1),
+ Arguments.of("Test case 3.",
+ ImmutableSet.of("000106"),
+ ImmutableSet.of("000063"),
+ ImmutableSet.of("000063"),
+ ImmutableSet.of("000078", "000071", "000075", "000073", "000103",
+ "000099", "000097", "000095"),
+ columnFamilyToPrefixMap1),
+ Arguments.of("Test case 4.",
+ ImmutableSet.of("000131"),
+ ImmutableSet.of("000106"),
+ ImmutableSet.of("000106"),
+ ImmutableSet.of("000123", "000121", "000128", "000125"),
+ columnFamilyToPrefixMap2),
+ Arguments.of("Test case 5.",
+ ImmutableSet.of("000131"),
+ ImmutableSet.of("000081"),
+ ImmutableSet.of("000081"),
+ ImmutableSet.of("000123", "000121", "000128", "000125", "000103",
+ "000099", "000097", "000095"),
+ columnFamilyToPrefixMap2),
+ Arguments.of("Test case 6.",
+ ImmutableSet.of("000147", "000131", "000141"),
+ ImmutableSet.of("000131"),
+ ImmutableSet.of("000131"),
+ ImmutableSet.of("000147", "000141"),
+ columnFamilyToPrefixMap3),
+ Arguments.of("Test case 7.",
+ ImmutableSet.of("000147", "000131", "000141"),
+ ImmutableSet.of("000106"),
+ ImmutableSet.of("000106"),
+ ImmutableSet.of("000123", "000121", "000128", "000125", "000147",
+ "000141"),
+ columnFamilyToPrefixMap3)
+ );
+ }
+
+ /**
+ * Tests core SST diff list logic. Does not involve DB.
+ * Focuses on testing edge cases in internalGetSSTDiffList().
+ */
+ @ParameterizedTest(name = "{0}")
+ @MethodSource("casesGetSSTDiffListWithoutDB2")
+ public void testGetSSTDiffListWithoutDB2(
+ String description,
+ Set<String> srcSnapshotSstFiles,
+ Set<String> destSnapshotSstFiles,
+ Set<String> expectedSameSstFiles,
+ Set<String> expectedDiffSstFiles,
+ Map<String, String> columnFamilyToPrefixMap
+ ) {
+ compactionLogEntryList.forEach(entry ->
+ rocksDBCheckpointDiffer.addToCompactionLogTable(entry));
+
+ rocksDBCheckpointDiffer.loadAllCompactionLogs();
+
+ // Snapshot is used for logging purpose and short-circuiting traversal.
+ // Using gen 0 for this test.
+ DifferSnapshotInfo mockedSourceSnapshot = new DifferSnapshotInfo(
+ "/path/to/dbcp1", UUID.randomUUID(), 0L, columnFamilyToPrefixMap,
null);
+ DifferSnapshotInfo mockedDestinationSnapshot = new DifferSnapshotInfo(
+ "/path/to/dbcp2", UUID.randomUUID(), 0L, columnFamilyToPrefixMap,
null);
+
+ Set<String> actualSameSstFiles = new HashSet<>();
+ Set<String> actualDiffSstFiles = new HashSet<>();
+
+ rocksDBCheckpointDiffer.internalGetSSTDiffList(
+ mockedSourceSnapshot,
+ mockedDestinationSnapshot,
+ srcSnapshotSstFiles,
+ destSnapshotSstFiles,
+ actualSameSstFiles,
+ actualDiffSstFiles);
+
+ // Check same and different SST files result
+ Assertions.assertEquals(expectedSameSstFiles, actualSameSstFiles);
+ Assertions.assertEquals(expectedDiffSstFiles, actualDiffSstFiles);
+ }
+
+ private static Stream<Arguments> shouldSkipNodeCases() {
+ List<Boolean> expectedResponse1 = Arrays.asList(true, false, true, false,
+ false, false, false, false, true, true, false, false, false, false,
+ false, true, true, true, true, true, false);
+ List<Boolean> expectedResponse2 = Arrays.asList(true, true, true, false,
+ false, false, false, false, true, true, false, false, false, false,
+ false, true, false, false, false, false, false);
+ List<Boolean> expectedResponse3 = Arrays.asList(true, true, true, true,
+ true, true, true, true, true, true, false, false, false, false, false,
+ true, false, false, false, false, false);
+ return Stream.of(
+ Arguments.of(columnFamilyToPrefixMap1, expectedResponse1),
+ Arguments.of(columnFamilyToPrefixMap2, expectedResponse2),
+ Arguments.of(columnFamilyToPrefixMap3, expectedResponse3));
+ }
+
+ @ParameterizedTest()
+ @MethodSource("shouldSkipNodeCases")
+ public void testShouldSkipNode(Map<String, String> columnFamilyToPrefixMap,
+ List<Boolean> expectedResponse) {
+ compactionLogEntryList.forEach(entry ->
+ rocksDBCheckpointDiffer.addToCompactionLogTable(entry));
+
+ rocksDBCheckpointDiffer.loadAllCompactionLogs();
+
+ List<Boolean> actualResponse = rocksDBCheckpointDiffer
+ .getCompactionNodeMap().values().stream()
+ .sorted(Comparator.comparing(CompactionNode::getFileName))
+ .map(node ->
+ rocksDBCheckpointDiffer.shouldSkipNode(node,
+ columnFamilyToPrefixMap))
+ .collect(Collectors.toList());
+
+ assertEquals(expectedResponse, actualResponse);
+ }
+
+ private static Stream<Arguments> shouldSkipNodeEdgeCases() {
+ CompactionNode node = new CompactionNode("fileName",
+ "snapshotId", 100, 100, "startKey", "endKey", "columnFamily");
+ CompactionNode nullColumnFamilyNode = new CompactionNode("fileName",
+ "snapshotId", 100, 100, "startKey", "endKey", null);
+ CompactionNode nullStartKeyNode = new CompactionNode("fileName",
+ "snapshotId", 100, 100, null, "endKey", "columnFamily");
+ CompactionNode nullEndKeyNode = new CompactionNode("fileName",
+ "snapshotId", 100, 100, "startKey", null, "columnFamily");
+
+ return Stream.of(
+ Arguments.of(node, Collections.emptyMap(), false),
+ Arguments.of(node, columnFamilyToPrefixMap1, true),
+ Arguments.of(nullColumnFamilyNode, columnFamilyToPrefixMap1, false),
+ Arguments.of(nullStartKeyNode, columnFamilyToPrefixMap1, false),
+ Arguments.of(nullEndKeyNode, columnFamilyToPrefixMap1, false));
+ }
+
+ @ParameterizedTest()
+ @MethodSource("shouldSkipNodeEdgeCases")
+ public void testShouldSkipNodeEdgeCase(
+ CompactionNode node,
+ Map<String, String> columnFamilyToPrefixMap,
+ boolean expectedResponse
+ ) {
+ compactionLogEntryList.forEach(entry ->
+ rocksDBCheckpointDiffer.addToCompactionLogTable(entry));
+
+ rocksDBCheckpointDiffer.loadAllCompactionLogs();
+
+ assertEquals(expectedResponse, rocksDBCheckpointDiffer.shouldSkipNode(node,
+ columnFamilyToPrefixMap));
+ }
}
diff --git
a/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDiffUtils.java
b/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDiffUtils.java
new file mode 100644
index 0000000000..67233676f0
--- /dev/null
+++
b/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDiffUtils.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ozone.rocksdiff;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Class to test RocksDiffUtils.
+ */
+public class TestRocksDiffUtils {
+ @Test
+ public void testFilterFunction() {
+ assertTrue(RocksDiffUtils.isKeyWithPrefixPresent(
+ "/vol1/bucket1/",
+ "/vol1/bucket1/key1",
+ "/vol1/bucket1/key1"));
+ assertTrue(RocksDiffUtils.isKeyWithPrefixPresent(
+ "/vol1/bucket3/",
+ "/vol1/bucket1/key1",
+ "/vol1/bucket5/key1"));
+ assertFalse(RocksDiffUtils.isKeyWithPrefixPresent(
+ "/vol1/bucket5/",
+ "/vol1/bucket1/key1",
+ "/vol1/bucket4/key9"));
+ assertFalse(RocksDiffUtils.isKeyWithPrefixPresent(
+ "/vol1/bucket2/",
+ "/vol1/bucket1/key1",
+ "/vol1/bucket1/key1"));
+ assertFalse(RocksDiffUtils.isKeyWithPrefixPresent(
+ "/vol1/bucket/",
+ "/vol1/bucket1/key1",
+ "/vol1/bucket1/key1"));
+ assertTrue(RocksDiffUtils.isKeyWithPrefixPresent(
+ "/volume/bucket/",
+ "/volume/bucket/key-1",
+ "/volume/bucket2/key-97"));
+ }
+}
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestOMSnapshotDAG.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestOMSnapshotDAG.java
index d21fdb3958..88fb010796 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestOMSnapshotDAG.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestOMSnapshotDAG.java
@@ -32,7 +32,6 @@ import org.apache.hadoop.ozone.client.OzoneVolume;
import org.apache.hadoop.ozone.om.IOmMetadataReader;
import org.apache.hadoop.ozone.om.OMConfigKeys;
import org.apache.hadoop.ozone.om.OMMetadataManager;
-import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
import org.apache.hadoop.ozone.om.OmSnapshot;
import org.apache.hadoop.ozone.om.OmSnapshotManager;
import org.apache.hadoop.ozone.om.OzoneManager;
@@ -60,9 +59,7 @@ import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
@@ -72,6 +69,7 @@ import static
org.apache.hadoop.ozone.OzoneConsts.DB_COMPACTION_LOG_DIR;
import static org.apache.hadoop.ozone.OzoneConsts.DB_COMPACTION_SST_BACKUP_DIR;
import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_DIFF_DIR;
+import static
org.apache.hadoop.ozone.om.snapshot.SnapshotUtils.getColumnFamilyToKeyPrefixMap;
/**
* Tests Freon, with MiniOzoneCluster.
@@ -157,26 +155,11 @@ public class TestOMSnapshotDAG {
// persisted at the time of snapshot creation, as the snapshot generation
return new DifferSnapshotInfo(checkpointPath, snapshotInfo.getSnapshotId(),
snapshotInfo.getDbTxSequenceNumber(),
- getTablePrefixes(omMetadataManager, volumeName, bucketName),
+ getColumnFamilyToKeyPrefixMap(omMetadataManager, volumeName,
+ bucketName),
snapshotDB);
}
- private Map<String, String> getTablePrefixes(
- OMMetadataManager omMetadataManager, String volumeName, String
bucketName)
- throws IOException {
- HashMap<String, String> tablePrefixes = new HashMap<>();
- String volumeId =
String.valueOf(omMetadataManager.getVolumeId(volumeName));
- String bucketId =
- String.valueOf(omMetadataManager.getBucketId(volumeName, bucketName));
- tablePrefixes.put(OmMetadataManagerImpl.KEY_TABLE,
- OM_KEY_PREFIX + volumeName + OM_KEY_PREFIX + bucketName);
- tablePrefixes.put(OmMetadataManagerImpl.FILE_TABLE,
- OM_KEY_PREFIX + volumeId + OM_KEY_PREFIX + bucketId);
- tablePrefixes.put(OmMetadataManagerImpl.DIRECTORY_TABLE,
- OM_KEY_PREFIX + volumeId + OM_KEY_PREFIX + bucketId);
- return tablePrefixes;
- }
-
@Test
public void testDAGReconstruction()
throws IOException, InterruptedException, TimeoutException {
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
index d88bd13c0d..f77a079da2 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
@@ -97,6 +97,7 @@ import static
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.INVA
import static
org.apache.hadoop.ozone.om.snapshot.SnapshotDiffManager.getSnapshotRootPath;
import static
org.apache.hadoop.ozone.om.snapshot.SnapshotUtils.checkSnapshotActive;
import static
org.apache.hadoop.ozone.om.snapshot.SnapshotUtils.dropColumnFamilyHandle;
+import static
org.apache.hadoop.ozone.om.snapshot.SnapshotUtils.getOzonePathKeyForFso;
import static
org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.JobStatus.DONE;
/**
@@ -478,8 +479,8 @@ public final class OmSnapshotManager implements
AutoCloseable {
String bucketName) throws IOException {
// Range delete start key (inclusive)
- final String keyPrefix = getOzonePathKeyWithVolumeBucketNames(
- omMetadataManager, volumeName, bucketName);
+ final String keyPrefix = getOzonePathKeyForFso(omMetadataManager,
+ volumeName, bucketName);
try (TableIterator<String, ? extends Table.KeyValue<String, OmKeyInfo>>
iter = omMetadataManager.getDeletedDirTable().iterator(keyPrefix)) {
@@ -494,32 +495,6 @@ public final class OmSnapshotManager implements
AutoCloseable {
}
}
- /**
- * Helper method to generate /volumeId/bucketId/ DB key prefix from given
- * volume name and bucket name as a prefix in FSO deletedDirectoryTable.
- * Follows:
- * {@link OmMetadataManagerImpl#getOzonePathKey(long, long, long, String)}.
- * <p>
- * Note: Currently, this is only intended to be a special use case in
- * {@link OmSnapshotManager}. If this is used elsewhere, consider moving this
- * to {@link OMMetadataManager}.
- *
- * @param volumeName volume name
- * @param bucketName bucket name
- * @return /volumeId/bucketId/
- * e.g. /-9223372036854772480/-9223372036854771968/
- */
- @VisibleForTesting
- public static String getOzonePathKeyWithVolumeBucketNames(
- OMMetadataManager omMetadataManager,
- String volumeName,
- String bucketName) throws IOException {
-
- final long volumeId = omMetadataManager.getVolumeId(volumeName);
- final long bucketId = omMetadataManager.getBucketId(volumeName,
bucketName);
- return OM_KEY_PREFIX + volumeId + OM_KEY_PREFIX + bucketId + OM_KEY_PREFIX;
- }
-
@VisibleForTesting
public SnapshotDiffManager getSnapshotDiffManager() {
return snapshotDiffManager;
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SstFilteringService.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SstFilteringService.java
index 16a6ad99f5..599928fe6a 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SstFilteringService.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SstFilteringService.java
@@ -18,15 +18,12 @@
*/
package org.apache.hadoop.ozone.om;
-
import com.google.common.annotations.VisibleForTesting;
-import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.utils.BackgroundService;
import org.apache.hadoop.hdds.utils.BackgroundTask;
import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
import org.apache.hadoop.hdds.utils.BackgroundTaskResult;
-import org.apache.hadoop.hdds.utils.BooleanTriFunction;
import org.apache.hadoop.hdds.utils.db.RDBStore;
import org.apache.hadoop.hdds.utils.db.RocksDatabase;
import org.apache.hadoop.hdds.utils.db.Table;
@@ -36,23 +33,21 @@ import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
import org.apache.hadoop.ozone.om.snapshot.ReferenceCounted;
import org.apache.hadoop.ozone.om.snapshot.SnapshotCache;
-import org.apache.ozone.rocksdiff.RocksDiffUtils;
import org.rocksdb.RocksDBException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
-import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
import static
org.apache.hadoop.ozone.om.OMConfigKeys.SNAPSHOT_SST_DELETING_LIMIT_PER_TASK;
import static
org.apache.hadoop.ozone.om.OMConfigKeys.SNAPSHOT_SST_DELETING_LIMIT_PER_TASK_DEFAULT;
import static
org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.SNAPSHOT_LOCK;
+import static
org.apache.hadoop.ozone.om.snapshot.SnapshotUtils.getColumnFamilyToKeyPrefixMap;
/**
* When snapshots are taken, an entire snapshot of the
@@ -83,18 +78,6 @@ public class SstFilteringService extends BackgroundService
private AtomicBoolean running;
- // Note: This filter only works till snapshots are readable only.
- // In the future, if snapshots are changed to writable as well,
- // this will need to be revisited.
- static final BooleanTriFunction<String, String, String, Boolean>
- FILTER_FUNCTION =
- (first, last, prefix) -> {
- String firstBucketKey = RocksDiffUtils.constructBucketKey(first);
- String lastBucketKey = RocksDiffUtils.constructBucketKey(last);
- return RocksDiffUtils
- .isKeyWithPrefixPresent(prefix, firstBucketKey, lastBucketKey);
- };
-
public SstFilteringService(long interval, TimeUnit unit, long serviceTimeout,
OzoneManager ozoneManager, OzoneConfiguration configuration) {
super("SstFilteringService", interval, unit, SST_FILTERING_CORE_POOL_SIZE,
@@ -192,8 +175,10 @@ public class SstFilteringService extends BackgroundService
LOG.debug("Processing snapshot {} to filter relevant SST Files",
snapShotTableKey);
- List<Pair<String, String>> prefixPairs = constructPrefixPairs(
- snapshotInfo);
+ Map<String, String> columnFamilyNameToPrefixMap =
+
getColumnFamilyToKeyPrefixMap(ozoneManager.getMetadataManager(),
+ snapshotInfo.getVolumeName(),
+ snapshotInfo.getBucketName());
try (
ReferenceCounted<IOmMetadataReader, SnapshotCache>
@@ -205,7 +190,7 @@ public class SstFilteringService extends BackgroundService
RocksDatabase db = rdbStore.getDb();
try (BootstrapStateHandler.Lock lock = getBootstrapStateLock()
.lock()) {
- db.deleteFilesNotMatchingPrefix(prefixPairs, FILTER_FUNCTION);
+ db.deleteFilesNotMatchingPrefix(columnFamilyNameToPrefixMap);
}
} catch (OMException ome) {
// FILE_NOT_FOUND is obtained when the snapshot is deleted
@@ -238,43 +223,8 @@ public class SstFilteringService extends BackgroundService
// nothing to return here
return BackgroundTaskResult.EmptyTaskResult.newResult();
}
-
- /**
- * @param snapshotInfo
- * @return a list of pairs (tableName,keyPrefix).
- * @throws IOException
- */
- private List<Pair<String, String>> constructPrefixPairs(
- SnapshotInfo snapshotInfo) throws IOException {
- String volumeName = snapshotInfo.getVolumeName();
- String bucketName = snapshotInfo.getBucketName();
-
- long volumeId =
ozoneManager.getMetadataManager().getVolumeId(volumeName);
- // TODO : HDDS-6984 buckets can be deleted via ofs
- // handle deletion of bucket case.
- long bucketId =
- ozoneManager.getMetadataManager().getBucketId(volumeName,
bucketName);
-
- String filterPrefix =
- OM_KEY_PREFIX + volumeName + OM_KEY_PREFIX + bucketName
- + OM_KEY_PREFIX;
-
- String filterPrefixFSO =
- OM_KEY_PREFIX + volumeId + OM_KEY_PREFIX + bucketId
- + OM_KEY_PREFIX;
-
- List<Pair<String, String>> prefixPairs = new ArrayList<>();
- prefixPairs
- .add(Pair.of(OmMetadataManagerImpl.KEY_TABLE, filterPrefix));
- prefixPairs.add(
- Pair.of(OmMetadataManagerImpl.DIRECTORY_TABLE, filterPrefixFSO));
- prefixPairs
- .add(Pair.of(OmMetadataManagerImpl.FILE_TABLE, filterPrefixFSO));
- return prefixPairs;
- }
}
-
@Override
public BackgroundTaskQueue getTasks() {
BackgroundTaskQueue queue = new BackgroundTaskQueue();
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java
index b9b8fd10e5..0b38607f81 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java
@@ -79,7 +79,6 @@ import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -115,12 +114,11 @@ import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_FORCE_FU
import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_DIFF_DISABLE_NATIVE_LIBS;
import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SNAPSHOT_DIFF_DISABLE_NATIVE_LIBS_DEFAULT;
import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.DIRECTORY_TABLE;
-import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.FILE_TABLE;
-import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.KEY_TABLE;
import static org.apache.hadoop.ozone.om.OmSnapshotManager.DELIMITER;
import static org.apache.hadoop.ozone.om.helpers.SnapshotInfo.getTableKey;
import static
org.apache.hadoop.ozone.om.snapshot.SnapshotUtils.checkSnapshotActive;
import static
org.apache.hadoop.ozone.om.snapshot.SnapshotUtils.dropColumnFamilyHandle;
+import static
org.apache.hadoop.ozone.om.snapshot.SnapshotUtils.getColumnFamilyToKeyPrefixMap;
import static
org.apache.hadoop.ozone.om.snapshot.SnapshotUtils.getSnapshotInfo;
import static
org.apache.hadoop.ozone.snapshot.CancelSnapshotDiffResponse.CancelMessage.CANCEL_FAILED;
import static
org.apache.hadoop.ozone.snapshot.CancelSnapshotDiffResponse.CancelMessage.CANCEL_ALREADY_CANCELLED_JOB;
@@ -367,24 +365,6 @@ public class SnapshotDiffManager implements AutoCloseable {
}
}
- private Map<String, String> getTablePrefixes(
- OMMetadataManager omMetadataManager,
- String volumeName, String bucketName) throws IOException {
- // Copied straight from TestOMSnapshotDAG. TODO: Dedup. Move this to util.
- Map<String, String> tablePrefixes = new HashMap<>();
- String volumeId =
String.valueOf(omMetadataManager.getVolumeId(volumeName));
- String bucketId = String.valueOf(
- omMetadataManager.getBucketId(volumeName, bucketName));
- tablePrefixes.put(KEY_TABLE,
- OM_KEY_PREFIX + volumeName + OM_KEY_PREFIX + bucketName +
- OM_KEY_PREFIX);
- tablePrefixes.put(FILE_TABLE,
- OM_KEY_PREFIX + volumeId + OM_KEY_PREFIX + bucketId + OM_KEY_PREFIX);
- tablePrefixes.put(DIRECTORY_TABLE,
- OM_KEY_PREFIX + volumeId + OM_KEY_PREFIX + bucketId + OM_KEY_PREFIX);
- return tablePrefixes;
- }
-
/**
* Convert from SnapshotInfo to DifferSnapshotInfo.
*/
@@ -402,7 +382,7 @@ public class SnapshotDiffManager implements AutoCloseable {
checkpointPath,
snapshotId,
dbTxSequenceNumber,
- getTablePrefixes(snapshotOMMM, volumeName, bucketName),
+ getColumnFamilyToKeyPrefixMap(snapshotOMMM, volumeName, bucketName),
((RDBStore)snapshotOMMM.getStore()).getDb().getManagedRocksDb());
}
@@ -904,8 +884,8 @@ public class SnapshotDiffManager implements AutoCloseable {
final BucketLayout bucketLayout = getBucketLayout(volumeName, bucketName,
fromSnapshot.getMetadataManager());
Map<String, String> tablePrefixes =
- getTablePrefixes(toSnapshot.getMetadataManager(), volumeName,
- bucketName);
+ getColumnFamilyToKeyPrefixMap(toSnapshot.getMetadataManager(),
+ volumeName, bucketName);
boolean useFullDiff = snapshotForceFullDiff || forceFullDiff;
boolean performNonNativeDiff = diffDisableNativeLibs ||
disableNativeDiff;
@@ -1215,8 +1195,8 @@ public class SnapshotDiffManager implements AutoCloseable
{
LOG.debug("Calling RocksDBCheckpointDiffer");
try {
- List<String> sstDiffList =
- differ.getSSTDiffListWithFullPath(toDSI, fromDSI, diffDir);
+ List<String> sstDiffList = differ.getSSTDiffListWithFullPath(toDSI,
+ fromDSI, diffDir);
deltaFiles.addAll(sstDiffList);
} catch (Exception exception) {
LOG.warn("Failed to get SST diff file using RocksDBCheckpointDiffer. "
+
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotUtils.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotUtils.java
index ef7e4e895e..e9b0aa30fd 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotUtils.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotUtils.java
@@ -19,6 +19,8 @@
package org.apache.hadoop.ozone.om.snapshot;
import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksDB;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
import org.apache.hadoop.ozone.om.OmSnapshotManager;
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.SnapshotChainManager;
@@ -32,8 +34,14 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
import java.util.UUID;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
+import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.DIRECTORY_TABLE;
+import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.FILE_TABLE;
+import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.KEY_TABLE;
import static
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.FILE_NOT_FOUND;
import static
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.KEY_NOT_FOUND;
import static
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.TIMEOUT;
@@ -155,4 +163,69 @@ public final class SnapshotUtils {
}
return null;
}
+
+ /**
+ * Return a map column family to prefix for the keys in the table for
+ * the given volume and bucket.
+ * Column families, map is returned for, are keyTable, dirTable and
fileTable.
+ */
+ public static Map<String, String> getColumnFamilyToKeyPrefixMap(
+ OMMetadataManager omMetadataManager,
+ String volumeName,
+ String bucketName
+ ) throws IOException {
+ String keyPrefix = getOzonePathKey(volumeName, bucketName);
+ String keyPrefixFso = getOzonePathKeyForFso(omMetadataManager, volumeName,
+ bucketName);
+
+ Map<String, String> columnFamilyToPrefixMap = new HashMap<>();
+ columnFamilyToPrefixMap.put(KEY_TABLE, keyPrefix);
+ columnFamilyToPrefixMap.put(DIRECTORY_TABLE, keyPrefixFso);
+ columnFamilyToPrefixMap.put(FILE_TABLE, keyPrefixFso);
+ return columnFamilyToPrefixMap;
+ }
+
+ /**
+ * Helper method to generate /volumeName/bucketBucket/ DB key prefix from
+ * given volume name and bucket name as a prefix for legacy and OBS buckets.
+ * Follows:
+ * {@link OmMetadataManagerImpl#getOzonePathKey(long, long, long, String)}.
+ * <p>
+ * Note: Currently, this is only intended to be a special use case in
+ * Snapshot. If this is used elsewhere, consider moving this to
+ * @link OMMetadataManager}.
+ *
+ * @param volumeName volume name
+ * @param bucketName bucket name
+ * @return /volumeName/bucketName/
+ */
+ public static String getOzonePathKey(String volumeName,
+ String bucketName) throws IOException {
+ return OM_KEY_PREFIX + volumeName + OM_KEY_PREFIX + bucketName +
+ OM_KEY_PREFIX;
+ }
+
+ /**
+ * Helper method to generate /volumeId/bucketId/ DB key prefix from given
+ * volume name and bucket name as a prefix for FSO buckets.
+ * Follows:
+ * {@link OmMetadataManagerImpl#getOzonePathKey(long, long, long, String)}.
+ * <p>
+ * Note: Currently, this is only intended to be a special use case in
+ * Snapshot. If this is used elsewhere, consider moving this to
+ * {@link OMMetadataManager}.
+ *
+ * @param volumeName volume name
+ * @param bucketName bucket name
+ * @return /volumeId/bucketId/
+ * e.g. /-9223372036854772480/-9223372036854771968/
+ */
+ public static String getOzonePathKeyForFso(OMMetadataManager metadataManager,
+ String volumeName,
+ String bucketName)
+ throws IOException {
+ final long volumeId = metadataManager.getVolumeId(volumeName);
+ final long bucketId = metadataManager.getBucketId(volumeName, bucketName);
+ return OM_KEY_PREFIX + volumeId + OM_KEY_PREFIX + bucketId + OM_KEY_PREFIX;
+ }
}
diff --git
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestSstFilteringService.java
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestSstFilteringService.java
index 7a17c70ef7..2009f7e5da 100644
---
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestSstFilteringService.java
+++
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestSstFilteringService.java
@@ -372,30 +372,6 @@ public class TestSstFilteringService {
managerProtocol.commitKey(keyArg, session.getId());
}
- @Test
- public void testFilterFunction() {
- assertTrue(SstFilteringService.FILTER_FUNCTION.apply(
- "/vol1/bucket1/key1",
- "/vol1/bucket1/key1",
- "/vol1/bucket1/"));
- assertTrue(SstFilteringService.FILTER_FUNCTION.apply(
- "/vol1/bucket1/key1",
- "/vol1/bucket5/key1",
- "/vol1/bucket3/"));
- assertFalse(SstFilteringService.FILTER_FUNCTION.apply(
- "/vol1/bucket1/key1",
- "/vol1/bucket4/key9",
- "/vol1/bucket5/"));
- assertFalse(SstFilteringService.FILTER_FUNCTION.apply(
- "/vol1/bucket1/key1",
- "/vol1/bucket1/key1",
- "/vol1/bucket2/"));
- assertFalse(SstFilteringService.FILTER_FUNCTION.apply(
- "/vol1/bucket1/key1",
- "/vol1/bucket1/key1",
- "/vol1/bucket/"));
- }
-
/**
* Test to verify the data integrity after SST filtering service runs.
* This test creates 150 keys randomly in one of the three buckets. It also
diff --git
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/snapshot/TestOMSnapshotCreateResponse.java
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/snapshot/TestOMSnapshotCreateResponse.java
index e837a17360..f0b729d103 100644
---
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/snapshot/TestOMSnapshotCreateResponse.java
+++
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/snapshot/TestOMSnapshotCreateResponse.java
@@ -29,10 +29,10 @@ import java.util.UUID;
import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.hdds.utils.db.TableIterator;
-import org.apache.hadoop.ozone.om.OmSnapshotManager;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
+import org.apache.hadoop.ozone.om.snapshot.SnapshotUtils;
import org.apache.hadoop.util.Time;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
@@ -214,8 +214,7 @@ public class TestOMSnapshotCreateResponse {
// Add deletedDirectoryTable key entries that "surround" the snapshot scope
Set<String> sentinelKeys = new HashSet<>();
- final String dbKeyPfx =
- OmSnapshotManager.getOzonePathKeyWithVolumeBucketNames(
+ final String dbKeyPfx = SnapshotUtils.getOzonePathKeyForFso(
omMetadataManager, volumeName, bucketName);
// Calculate offset to bucketId's last character in dbKeyPfx.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]