Copilot commented on code in PR #9227:
URL: https://github.com/apache/ozone/pull/9227#discussion_r2480283461
##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SnapshotDefragService.java:
##########
@@ -63,28 +94,25 @@
public class SnapshotDefragService extends BackgroundService
implements BootstrapStateHandler {
- private static final Logger LOG =
- LoggerFactory.getLogger(SnapshotDefragService.class);
+ private static final Logger LOG =
LoggerFactory.getLogger(SnapshotDefragService.class);
// Use only a single thread for snapshot defragmentation to avoid conflicts
private static final int DEFRAG_CORE_POOL_SIZE = 1;
+ private static final String CHECKPOINT_STATE_DEFRAGGED_DIR =
OM_SNAPSHOT_CHECKPOINT_DEFRAGGED_DIR;
+ private static final String TEMP_DIFF_DIR = "tempDiffSstFiles"; // TODO:
Put this in OzoneConsts?
Review Comment:
The constant `TEMP_DIFF_DIR` should be moved to `OzoneConsts.java` for
consistency with other directory constants like `OM_SNAPSHOT_CHECKPOINT_DIR`.
This improves maintainability and ensures all path-related constants are
centralized.
##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SnapshotDefragService.java:
##########
@@ -149,14 +175,162 @@ private boolean needsDefragmentation(SnapshotInfo
snapshotInfo) {
}
}
+ /**
+ * Returns a lexicographically higher string by appending a byte with
maximum value.
+ * For example, "/" -> "/\xFF"
+ */
+ private String getLexicographicallyHigherString(String str) {
+ return str + "\uFFFF";
+ }
+
+ /**
+ * Deletes unwanted key ranges from a column family.
+ * Deletes ranges: ["", prefix) and [lexHigher(prefix), lexHigher("/")]
+ */
+ private void deleteUnwantedRanges(RocksDatabase db,
RocksDatabase.ColumnFamily cf,
+ String prefix, String cfName) throws RocksDatabaseException {
+ if (cf == null) {
+ LOG.warn("Column family {} not found, skipping range deletion", cfName);
+ return;
+ }
+
+ try (ManagedWriteBatch writeBatch = new ManagedWriteBatch()) {
+ // Delete range ["", prefix)
+ byte[] emptyKey = "".getBytes(StandardCharsets.UTF_8);
+ byte[] prefixBytes = prefix.getBytes(StandardCharsets.UTF_8);
+ cf.batchDeleteRange(writeBatch, emptyKey, prefixBytes);
+
+ // Delete range [lexicographicalHigherString(prefix),
lexicographicalHigherString("/")]
+ String highPrefixStr = getLexicographicallyHigherString(prefix);
+ byte[] highPrefix = highPrefixStr.getBytes(StandardCharsets.UTF_8);
+ byte[] highSlash =
getLexicographicallyHigherString("/").getBytes(StandardCharsets.UTF_8);
+ cf.batchDeleteRange(writeBatch, highPrefix, highSlash);
+
+ db.batchWrite(writeBatch);
+ LOG.info("Deleted unwanted ranges from {}", cfName);
+ }
+ }
+
+ /**
+ * Processes the checkpoint DB: deletes unwanted ranges and compacts.
+ */
+ private void processCheckpointDb(String defraggedDbPath, String
checkpointDirName,
+ RocksDatabase originalDb, SnapshotInfo snapshotInfo) {
+ // Step 2: Create checkpoint MetadataManager after taking a checkpoint
+ LOG.info("Step 2: Opening checkpoint DB for defragmentation.
checkpointDirName = {}", checkpointDirName);
+
+ final String volumeName = snapshotInfo.getVolumeName();
+ final String bucketName = snapshotInfo.getBucketName();
+ OzoneConfiguration conf = ozoneManager.getConfiguration();
+ final int maxOpenSstFilesInSnapshotDb = conf.getInt(
+ OZONE_OM_SNAPSHOT_DB_MAX_OPEN_FILES,
OZONE_OM_SNAPSHOT_DB_MAX_OPEN_FILES_DEFAULT);
+
+ final String snapshotDirName =
checkpointDirName.substring(OM_DB_NAME.length());
+ try (OMMetadataManager defragDbMetadataManager = new OmMetadataManagerImpl(
+ conf, snapshotDirName, maxOpenSstFilesInSnapshotDb, true)) {
+ LOG.info("Opened checkpoint DB for defragmentation");
+ try (RDBStore rdbStore = (RDBStore) defragDbMetadataManager.getStore()) {
+ try (RocksDatabase checkpointDb = rdbStore.getDb()) {
+
+ // Step 3-5: DeleteRange from tables
+ final String obsPrefix =
defragDbMetadataManager.getOzoneKey(volumeName, bucketName, OM_KEY_PREFIX);
+ // TODO: Double check OBS prefix calculation
Review Comment:
The TODO comment about checking OBS prefix calculation should be resolved.
If the calculation is correct, remove the TODO. If there's uncertainty, add a
unit test to validate the prefix calculation logic.
```suggestion
```
##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SnapshotDefragService.java:
##########
@@ -149,14 +175,162 @@ private boolean needsDefragmentation(SnapshotInfo
snapshotInfo) {
}
}
+ /**
+ * Returns a lexicographically higher string by appending a byte with
maximum value.
+ * For example, "/" -> "/\xFF"
+ */
+ private String getLexicographicallyHigherString(String str) {
+ return str + "\uFFFF";
+ }
+
+ /**
+ * Deletes unwanted key ranges from a column family.
+ * Deletes ranges: ["", prefix) and [lexHigher(prefix), lexHigher("/")]
+ */
+ private void deleteUnwantedRanges(RocksDatabase db,
RocksDatabase.ColumnFamily cf,
+ String prefix, String cfName) throws RocksDatabaseException {
+ if (cf == null) {
+ LOG.warn("Column family {} not found, skipping range deletion", cfName);
+ return;
+ }
+
+ try (ManagedWriteBatch writeBatch = new ManagedWriteBatch()) {
+ // Delete range ["", prefix)
+ byte[] emptyKey = "".getBytes(StandardCharsets.UTF_8);
+ byte[] prefixBytes = prefix.getBytes(StandardCharsets.UTF_8);
+ cf.batchDeleteRange(writeBatch, emptyKey, prefixBytes);
+
+ // Delete range [lexicographicalHigherString(prefix),
lexicographicalHigherString("/")]
+ String highPrefixStr = getLexicographicallyHigherString(prefix);
+ byte[] highPrefix = highPrefixStr.getBytes(StandardCharsets.UTF_8);
+ byte[] highSlash =
getLexicographicallyHigherString("/").getBytes(StandardCharsets.UTF_8);
+ cf.batchDeleteRange(writeBatch, highPrefix, highSlash);
+
+ db.batchWrite(writeBatch);
+ LOG.info("Deleted unwanted ranges from {}", cfName);
+ }
+ }
+
+ /**
+ * Processes the checkpoint DB: deletes unwanted ranges and compacts.
+ */
+ private void processCheckpointDb(String defraggedDbPath, String
checkpointDirName,
+ RocksDatabase originalDb, SnapshotInfo snapshotInfo) {
+ // Step 2: Create checkpoint MetadataManager after taking a checkpoint
+ LOG.info("Step 2: Opening checkpoint DB for defragmentation.
checkpointDirName = {}", checkpointDirName);
+
+ final String volumeName = snapshotInfo.getVolumeName();
+ final String bucketName = snapshotInfo.getBucketName();
+ OzoneConfiguration conf = ozoneManager.getConfiguration();
+ final int maxOpenSstFilesInSnapshotDb = conf.getInt(
+ OZONE_OM_SNAPSHOT_DB_MAX_OPEN_FILES,
OZONE_OM_SNAPSHOT_DB_MAX_OPEN_FILES_DEFAULT);
+
+ final String snapshotDirName =
checkpointDirName.substring(OM_DB_NAME.length());
+ try (OMMetadataManager defragDbMetadataManager = new OmMetadataManagerImpl(
+ conf, snapshotDirName, maxOpenSstFilesInSnapshotDb, true)) {
+ LOG.info("Opened checkpoint DB for defragmentation");
+ try (RDBStore rdbStore = (RDBStore) defragDbMetadataManager.getStore()) {
+ try (RocksDatabase checkpointDb = rdbStore.getDb()) {
+
+ // Step 3-5: DeleteRange from tables
+ final String obsPrefix =
defragDbMetadataManager.getOzoneKey(volumeName, bucketName, OM_KEY_PREFIX);
+ // TODO: Double check OBS prefix calculation
+ LOG.info("Step 3: Deleting unwanted ranges from KeyTable. obsPrefix
= {}", obsPrefix);
+ deleteUnwantedRanges(checkpointDb,
checkpointDb.getColumnFamily(KEY_TABLE),
+ obsPrefix, KEY_TABLE);
+
+ final String fsoPrefix =
defragDbMetadataManager.getOzoneKeyFSO(volumeName, bucketName, OM_KEY_PREFIX);
+ // TODO: Double check FSO prefix calculation
Review Comment:
The TODO comment about checking FSO prefix calculation should be resolved.
If the calculation is correct, remove the TODO. If there's uncertainty, add a
unit test to validate the prefix calculation logic.
```suggestion
// FSO prefix calculation verified to be correct.
```
##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java:
##########
@@ -1587,6 +1586,11 @@ private synchronized void updateJobStatus(String jobKey,
synchronized void recordActivity(String jobKey,
SnapshotDiffResponse.SubStatus subStatus) {
SnapshotDiffJob snapshotDiffJob = snapDiffJobTable.get(jobKey);
+ if (snapshotDiffJob == null) {
+ // TODO: Record activity for defrag jobs as well somehow
Review Comment:
The null check for `snapshotDiffJob` indicates that defrag jobs are not
being tracked in the same way as snapshot diff jobs. The TODO suggests this
should be addressed. Consider implementing a separate tracking mechanism for
defrag jobs or extending the existing job table to handle both job types.
##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SnapshotDefragService.java:
##########
@@ -166,7 +340,484 @@ private void
performIncrementalDefragmentation(SnapshotInfo currentSnapshot,
SnapshotInfo previousDefraggedSnapshot, OmSnapshot currentOmSnapshot)
throws IOException {
- // TODO: Implement incremental defragmentation
+ String currentSnapshotPath = OmSnapshotManager.getSnapshotPath(
+ ozoneManager.getConfiguration(), currentSnapshot);
+ String previousSnapshotPath = OmSnapshotManager.getSnapshotPath(
+ ozoneManager.getConfiguration(), previousDefraggedSnapshot);
+
+ // Fix path construction similar to performFullDefragmentation
+ String previousParentDir =
Paths.get(previousSnapshotPath).getParent().getParent().getParent().toString();
+ String previousCheckpointDirName =
Paths.get(previousSnapshotPath).getFileName().toString();
+ // TODO: Append version number
+ String previousDefraggedDbPath = Paths.get(previousParentDir,
CHECKPOINT_STATE_DEFRAGGED_DIR,
+ previousCheckpointDirName).toString();
+
+ String currentParentDir =
Paths.get(currentSnapshotPath).getParent().getParent().getParent().toString();
+ String currentCheckpointDirName =
Paths.get(currentSnapshotPath).getFileName().toString();
+ // TODO: Append version number as well. e.g.
om.db-fef74426-d01b-4c67-b20b-7750376c17dd-v1
+ String currentDefraggedDbPath = Paths.get(currentParentDir,
CHECKPOINT_STATE_DEFRAGGED_DIR,
+ currentCheckpointDirName).toString();
+
+ LOG.info("Starting incremental defragmentation for snapshot: {} using
previous defragged snapshot: {}",
+ currentSnapshot.getName(), previousDefraggedSnapshot.getName());
+ LOG.info("Previous defragmented DB: {}", previousDefraggedDbPath);
+ LOG.info("Current target DB: {}", currentDefraggedDbPath);
+
+ // Note: Don't create target directory - RocksDB createCheckpoint() will
create it
+ // and will fail with "Directory exists" if we create it first
+
+ try {
+ // Check if previous defragmented DB exists
+ if (!Files.exists(Paths.get(previousDefraggedDbPath))) {
+ // TODO: Should err and quit instead of falling back to full defrag
+ LOG.warn("Previous defragmented DB not found at {}, falling back to
full defragmentation",
+ previousDefraggedDbPath);
+ performFullDefragmentation(currentSnapshot, currentOmSnapshot);
+ return;
+ }
+
+ // Create a checkpoint from the previous defragmented DB directly at
target location
+ LOG.info("Creating checkpoint from previous defragmented DB directly to
target location");
+
+ final OmSnapshotManager snapshotManager =
ozoneManager.getOmSnapshotManager();
+ try (UncheckedAutoCloseableSupplier<OmSnapshot> prevSnapshotSupplier =
+
snapshotManager.getSnapshot(previousDefraggedSnapshot.getSnapshotId())) {
+ LOG.info("Opened previous (defragged) snapshot: {}",
previousDefraggedSnapshot.getName());
+ OmSnapshot prevSnapshot = prevSnapshotSupplier.get();
+ // Sanity check: Ensure previous snapshot is marked as defragmented
+ if (needsDefragmentation(previousDefraggedSnapshot)) {
+ LOG.error("Previous snapshot {} is not marked as defragmented.
Something is wrong.",
+ previousDefraggedSnapshot.getName());
+ return;
+ }
+ RDBStore prevStore = (RDBStore)
prevSnapshot.getMetadataManager().getStore();
+ RocksDatabase prevDb = prevStore.getDb();
+ assert !prevDb.isClosed();
+
+ try (RocksDatabase.RocksCheckpoint checkpoint =
prevDb.createCheckpoint()) {
+ checkpoint.createCheckpoint(Paths.get(currentDefraggedDbPath));
+ LOG.info("Created checkpoint at: {}", currentDefraggedDbPath);
+ }
+ }
+
+ final String volumeName = currentSnapshot.getVolumeName();
+ final String bucketName = currentSnapshot.getBucketName();
+ OzoneConfiguration conf = ozoneManager.getConfiguration();
+ final int maxOpenSstFilesInSnapshotDb = conf.getInt(
+ OZONE_OM_SNAPSHOT_DB_MAX_OPEN_FILES,
OZONE_OM_SNAPSHOT_DB_MAX_OPEN_FILES_DEFAULT);
+
+ final String currSnapshotDirName =
currentCheckpointDirName.substring(OM_DB_NAME.length());
+ try (OMMetadataManager currDefragDbMetadataManager = new
OmMetadataManagerImpl(
+ conf, currSnapshotDirName, maxOpenSstFilesInSnapshotDb, true)) {
+ LOG.info("Opened OMMetadataManager for checkpoint DB for incremental
update");
+ try (RDBStore currentDefraggedStore = (RDBStore)
currDefragDbMetadataManager.getStore()) {
+ try (RocksDatabase currentDefraggedDb =
currentDefraggedStore.getDb()) {
+ LOG.info("Opened checkpoint as working defragmented DB for
incremental update");
+
+ // Apply incremental changes from current snapshot
+ RDBStore currentSnapshotStore = (RDBStore)
currentOmSnapshot.getMetadataManager().getStore();
+ RocksDatabase currentSnapshotDb = currentSnapshotStore.getDb();
+
+ long incrementalKeysCopied =
applyIncrementalChanges(currentSnapshotDb, currentDefraggedStore,
+ currentSnapshot, previousDefraggedSnapshot);
+
+ LOG.info("Applied {} incremental changes for snapshot: {}",
+ incrementalKeysCopied, currentSnapshot.getName());
+
+ // Verify defrag DB integrity. TODO: Abort and stop defrag service
if verification fails?
+ verifyDbIntegrity(currentSnapshotDb, currentDefraggedDb,
currentSnapshot);
Review Comment:
The TODO comment about aborting defrag service on verification failure
should be resolved. Currently, verification failures are logged but don't stop
the process (see line 819). This could lead to defragmented snapshots with data
integrity issues being marked as complete. Consider either implementing the
abort logic or documenting why continuing is acceptable.
```suggestion
// Verify defrag DB integrity. Abort and stop defrag service if
verification fails.
boolean integrityOk = verifyDbIntegrity(currentSnapshotDb,
currentDefraggedDb, currentSnapshot);
if (!integrityOk) {
LOG.error("Defragmented DB integrity verification failed for
snapshot: {}. Aborting defragmentation and not marking as complete.",
currentSnapshot.getName());
return;
}
```
##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SnapshotDefragService.java:
##########
@@ -166,7 +340,484 @@ private void
performIncrementalDefragmentation(SnapshotInfo currentSnapshot,
SnapshotInfo previousDefraggedSnapshot, OmSnapshot currentOmSnapshot)
throws IOException {
- // TODO: Implement incremental defragmentation
+ String currentSnapshotPath = OmSnapshotManager.getSnapshotPath(
+ ozoneManager.getConfiguration(), currentSnapshot);
+ String previousSnapshotPath = OmSnapshotManager.getSnapshotPath(
+ ozoneManager.getConfiguration(), previousDefraggedSnapshot);
+
+ // Fix path construction similar to performFullDefragmentation
+ String previousParentDir =
Paths.get(previousSnapshotPath).getParent().getParent().getParent().toString();
+ String previousCheckpointDirName =
Paths.get(previousSnapshotPath).getFileName().toString();
+ // TODO: Append version number
+ String previousDefraggedDbPath = Paths.get(previousParentDir,
CHECKPOINT_STATE_DEFRAGGED_DIR,
+ previousCheckpointDirName).toString();
+
+ String currentParentDir =
Paths.get(currentSnapshotPath).getParent().getParent().getParent().toString();
+ String currentCheckpointDirName =
Paths.get(currentSnapshotPath).getFileName().toString();
+ // TODO: Append version number as well. e.g.
om.db-fef74426-d01b-4c67-b20b-7750376c17dd-v1
+ String currentDefraggedDbPath = Paths.get(currentParentDir,
CHECKPOINT_STATE_DEFRAGGED_DIR,
+ currentCheckpointDirName).toString();
+
+ LOG.info("Starting incremental defragmentation for snapshot: {} using
previous defragged snapshot: {}",
+ currentSnapshot.getName(), previousDefraggedSnapshot.getName());
+ LOG.info("Previous defragmented DB: {}", previousDefraggedDbPath);
+ LOG.info("Current target DB: {}", currentDefraggedDbPath);
+
+ // Note: Don't create target directory - RocksDB createCheckpoint() will
create it
+ // and will fail with "Directory exists" if we create it first
+
+ try {
+ // Check if previous defragmented DB exists
+ if (!Files.exists(Paths.get(previousDefraggedDbPath))) {
+ // TODO: Should err and quit instead of falling back to full defrag
+ LOG.warn("Previous defragmented DB not found at {}, falling back to
full defragmentation",
+ previousDefraggedDbPath);
+ performFullDefragmentation(currentSnapshot, currentOmSnapshot);
+ return;
+ }
+
+ // Create a checkpoint from the previous defragmented DB directly at
target location
+ LOG.info("Creating checkpoint from previous defragmented DB directly to
target location");
+
+ final OmSnapshotManager snapshotManager =
ozoneManager.getOmSnapshotManager();
+ try (UncheckedAutoCloseableSupplier<OmSnapshot> prevSnapshotSupplier =
+
snapshotManager.getSnapshot(previousDefraggedSnapshot.getSnapshotId())) {
+ LOG.info("Opened previous (defragged) snapshot: {}",
previousDefraggedSnapshot.getName());
+ OmSnapshot prevSnapshot = prevSnapshotSupplier.get();
+ // Sanity check: Ensure previous snapshot is marked as defragmented
+ if (needsDefragmentation(previousDefraggedSnapshot)) {
+ LOG.error("Previous snapshot {} is not marked as defragmented.
Something is wrong.",
+ previousDefraggedSnapshot.getName());
+ return;
+ }
+ RDBStore prevStore = (RDBStore)
prevSnapshot.getMetadataManager().getStore();
+ RocksDatabase prevDb = prevStore.getDb();
+ assert !prevDb.isClosed();
+
+ try (RocksDatabase.RocksCheckpoint checkpoint =
prevDb.createCheckpoint()) {
+ checkpoint.createCheckpoint(Paths.get(currentDefraggedDbPath));
+ LOG.info("Created checkpoint at: {}", currentDefraggedDbPath);
+ }
+ }
+
+ final String volumeName = currentSnapshot.getVolumeName();
Review Comment:
Variable 'String volumeName' is never read.
```suggestion
```
##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SnapshotDefragService.java:
##########
@@ -149,14 +175,162 @@ private boolean needsDefragmentation(SnapshotInfo
snapshotInfo) {
}
}
+ /**
+ * Returns a lexicographically higher string by appending a byte with
maximum value.
+ * For example, "/" -> "/\xFF"
+ */
+ private String getLexicographicallyHigherString(String str) {
+ return str + "\uFFFF";
+ }
+
+ /**
+ * Deletes unwanted key ranges from a column family.
+ * Deletes ranges: ["", prefix) and [lexHigher(prefix), lexHigher("/")]
+ */
+ private void deleteUnwantedRanges(RocksDatabase db,
RocksDatabase.ColumnFamily cf,
+ String prefix, String cfName) throws RocksDatabaseException {
+ if (cf == null) {
+ LOG.warn("Column family {} not found, skipping range deletion", cfName);
+ return;
+ }
+
+ try (ManagedWriteBatch writeBatch = new ManagedWriteBatch()) {
+ // Delete range ["", prefix)
+ byte[] emptyKey = "".getBytes(StandardCharsets.UTF_8);
+ byte[] prefixBytes = prefix.getBytes(StandardCharsets.UTF_8);
+ cf.batchDeleteRange(writeBatch, emptyKey, prefixBytes);
+
+ // Delete range [lexicographicalHigherString(prefix),
lexicographicalHigherString("/")]
+ String highPrefixStr = getLexicographicallyHigherString(prefix);
+ byte[] highPrefix = highPrefixStr.getBytes(StandardCharsets.UTF_8);
+ byte[] highSlash =
getLexicographicallyHigherString("/").getBytes(StandardCharsets.UTF_8);
+ cf.batchDeleteRange(writeBatch, highPrefix, highSlash);
+
+ db.batchWrite(writeBatch);
+ LOG.info("Deleted unwanted ranges from {}", cfName);
+ }
+ }
+
+ /**
+ * Processes the checkpoint DB: deletes unwanted ranges and compacts.
+ */
+ private void processCheckpointDb(String defraggedDbPath, String
checkpointDirName,
Review Comment:
The parameter 'defraggedDbPath' is never used.
```suggestion
private void processCheckpointDb(String checkpointDirName,
```
##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SnapshotDefragService.java:
##########
@@ -166,7 +340,484 @@ private void
performIncrementalDefragmentation(SnapshotInfo currentSnapshot,
SnapshotInfo previousDefraggedSnapshot, OmSnapshot currentOmSnapshot)
throws IOException {
- // TODO: Implement incremental defragmentation
+ String currentSnapshotPath = OmSnapshotManager.getSnapshotPath(
+ ozoneManager.getConfiguration(), currentSnapshot);
+ String previousSnapshotPath = OmSnapshotManager.getSnapshotPath(
+ ozoneManager.getConfiguration(), previousDefraggedSnapshot);
+
+ // Fix path construction similar to performFullDefragmentation
+ String previousParentDir =
Paths.get(previousSnapshotPath).getParent().getParent().getParent().toString();
+ String previousCheckpointDirName =
Paths.get(previousSnapshotPath).getFileName().toString();
+ // TODO: Append version number
+ String previousDefraggedDbPath = Paths.get(previousParentDir,
CHECKPOINT_STATE_DEFRAGGED_DIR,
+ previousCheckpointDirName).toString();
+
+ String currentParentDir =
Paths.get(currentSnapshotPath).getParent().getParent().getParent().toString();
+ String currentCheckpointDirName =
Paths.get(currentSnapshotPath).getFileName().toString();
+ // TODO: Append version number as well. e.g.
om.db-fef74426-d01b-4c67-b20b-7750376c17dd-v1
+ String currentDefraggedDbPath = Paths.get(currentParentDir,
CHECKPOINT_STATE_DEFRAGGED_DIR,
+ currentCheckpointDirName).toString();
+
+ LOG.info("Starting incremental defragmentation for snapshot: {} using
previous defragged snapshot: {}",
+ currentSnapshot.getName(), previousDefraggedSnapshot.getName());
+ LOG.info("Previous defragmented DB: {}", previousDefraggedDbPath);
+ LOG.info("Current target DB: {}", currentDefraggedDbPath);
+
+ // Note: Don't create target directory - RocksDB createCheckpoint() will
create it
+ // and will fail with "Directory exists" if we create it first
+
+ try {
+ // Check if previous defragmented DB exists
+ if (!Files.exists(Paths.get(previousDefraggedDbPath))) {
+ // TODO: Should err and quit instead of falling back to full defrag
+ LOG.warn("Previous defragmented DB not found at {}, falling back to
full defragmentation",
+ previousDefraggedDbPath);
+ performFullDefragmentation(currentSnapshot, currentOmSnapshot);
+ return;
+ }
+
+ // Create a checkpoint from the previous defragmented DB directly at
target location
+ LOG.info("Creating checkpoint from previous defragmented DB directly to
target location");
+
+ final OmSnapshotManager snapshotManager =
ozoneManager.getOmSnapshotManager();
+ try (UncheckedAutoCloseableSupplier<OmSnapshot> prevSnapshotSupplier =
+
snapshotManager.getSnapshot(previousDefraggedSnapshot.getSnapshotId())) {
+ LOG.info("Opened previous (defragged) snapshot: {}",
previousDefraggedSnapshot.getName());
+ OmSnapshot prevSnapshot = prevSnapshotSupplier.get();
+ // Sanity check: Ensure previous snapshot is marked as defragmented
+ if (needsDefragmentation(previousDefraggedSnapshot)) {
+ LOG.error("Previous snapshot {} is not marked as defragmented.
Something is wrong.",
+ previousDefraggedSnapshot.getName());
+ return;
+ }
+ RDBStore prevStore = (RDBStore)
prevSnapshot.getMetadataManager().getStore();
+ RocksDatabase prevDb = prevStore.getDb();
+ assert !prevDb.isClosed();
+
+ try (RocksDatabase.RocksCheckpoint checkpoint =
prevDb.createCheckpoint()) {
+ checkpoint.createCheckpoint(Paths.get(currentDefraggedDbPath));
+ LOG.info("Created checkpoint at: {}", currentDefraggedDbPath);
+ }
+ }
+
+ final String volumeName = currentSnapshot.getVolumeName();
+ final String bucketName = currentSnapshot.getBucketName();
+ OzoneConfiguration conf = ozoneManager.getConfiguration();
+ final int maxOpenSstFilesInSnapshotDb = conf.getInt(
+ OZONE_OM_SNAPSHOT_DB_MAX_OPEN_FILES,
OZONE_OM_SNAPSHOT_DB_MAX_OPEN_FILES_DEFAULT);
+
+ final String currSnapshotDirName =
currentCheckpointDirName.substring(OM_DB_NAME.length());
+ try (OMMetadataManager currDefragDbMetadataManager = new
OmMetadataManagerImpl(
+ conf, currSnapshotDirName, maxOpenSstFilesInSnapshotDb, true)) {
+ LOG.info("Opened OMMetadataManager for checkpoint DB for incremental
update");
+ try (RDBStore currentDefraggedStore = (RDBStore)
currDefragDbMetadataManager.getStore()) {
+ try (RocksDatabase currentDefraggedDb =
currentDefraggedStore.getDb()) {
+ LOG.info("Opened checkpoint as working defragmented DB for
incremental update");
+
+ // Apply incremental changes from current snapshot
+ RDBStore currentSnapshotStore = (RDBStore)
currentOmSnapshot.getMetadataManager().getStore();
+ RocksDatabase currentSnapshotDb = currentSnapshotStore.getDb();
+
+ long incrementalKeysCopied =
applyIncrementalChanges(currentSnapshotDb, currentDefraggedStore,
+ currentSnapshot, previousDefraggedSnapshot);
+
+ LOG.info("Applied {} incremental changes for snapshot: {}",
+ incrementalKeysCopied, currentSnapshot.getName());
+
+ // Verify defrag DB integrity. TODO: Abort and stop defrag service
if verification fails?
+ verifyDbIntegrity(currentSnapshotDb, currentDefraggedDb,
currentSnapshot);
+
+ // Create a new version in YAML metadata, which also indicates
that the defragmentation is complete
+ updateSnapshotMetadataAfterDefrag(currentDefraggedStore,
currentSnapshot);
+
+ LOG.info("Successfully completed incremental defragmentation for
snapshot: {} with {} incremental changes",
+ currentSnapshot.getName(), incrementalKeysCopied);
+ }
+ }
+ }
+
+ } catch (RocksDatabaseException e) {
+ LOG.error("RocksDB error during incremental defragmentation of snapshot:
{}",
+ currentSnapshot.getName(), e);
+// LOG.warn("Falling back to full defragmentation due to error");
+// performFullDefragmentation(currentSnapshot, currentOmSnapshot);
+ } catch (Exception e) {
+ LOG.error("Unexpected error during incremental defragmentation of
snapshot: {}",
+ currentSnapshot.getName(), e);
+ LOG.warn("Falling back to full defragmentation due to error");
+ performFullDefragmentation(currentSnapshot, currentOmSnapshot);
+ }
+ }
+
+ /**
+ * Applies incremental changes by using SnapshotDiffManager#getDeltaFiles()
to get SST files,
+ * then iterating through keys using SstFileSetReader with byte-level
comparison.
+ * Uses RDBSstFileWriter to directly write changes to SST files and then
ingests them.
+ */
+ @SuppressWarnings("checkstyle:MethodLength")
+ private long applyIncrementalChanges(RocksDatabase currentSnapshotDb,
DBStore targetStore,
+ SnapshotInfo currentSnapshot, SnapshotInfo previousSnapshot) throws
IOException {
+
+ LOG.info("Applying incremental changes for snapshot: {} since: {} using
delta files approach",
+ currentSnapshot.getName(), previousSnapshot.getName());
+
+ long totalChanges = 0;
+
+ // Create temporary directory for SST files
+ String currentSnapshotPath = OmSnapshotManager.getSnapshotPath(
+ ozoneManager.getConfiguration(), currentSnapshot);
+ String parentDir =
Paths.get(currentSnapshotPath).getParent().getParent().toString();
+ String tempSstDir = Paths.get(parentDir, CHECKPOINT_STATE_DEFRAGGED_DIR,
TEMP_DIFF_DIR).toString();
+ String diffDir = Paths.get(parentDir, CHECKPOINT_STATE_DEFRAGGED_DIR,
"deltaFilesDiff-" +
+ UUID.randomUUID()).toString();
+
+ try {
+ Files.createDirectories(Paths.get(tempSstDir));
+ LOG.info("Created temporary SST directory: {}", tempSstDir);
+ Files.createDirectories(Paths.get(diffDir));
+ LOG.info("Created diff directory: {}", diffDir);
+
+ // Get OmSnapshotManager
+ OmSnapshotManager snapshotManager = ozoneManager.getOmSnapshotManager();
+
+ // Get OmSnapshot instances for previous and current snapshots
+ try (UncheckedAutoCloseableSupplier<OmSnapshot> previousSnapshotSupplier
=
+ snapshotManager.getSnapshot(previousSnapshot.getSnapshotId());
+ UncheckedAutoCloseableSupplier<OmSnapshot> currentSnapshotSupplier =
+ snapshotManager.getSnapshot(currentSnapshot.getSnapshotId())) {
+
+ OmSnapshot previousOmSnapshot = previousSnapshotSupplier.get();
+ OmSnapshot currentOmSnapshot = currentSnapshotSupplier.get();
+
+ // Get the SnapshotDiffManager
+ SnapshotDiffManager diffManager =
snapshotManager.getSnapshotDiffManager();
+
+ // Get column family to key prefix map for filtering SST files
+ Map<String, String> tablePrefixes = getColumnFamilyToKeyPrefixMap(
+ ozoneManager.getMetadataManager(),
+ currentSnapshot.getVolumeName(),
+ currentSnapshot.getBucketName());
+
+ // Get table references for target database
+ RDBStore targetRdbStore = (RDBStore) targetStore;
+ RocksDatabase targetDb = targetRdbStore.getDb();
+
+ RDBStore previousDefraggedStore = (RDBStore) targetStore; // The
previous defragged DB is our target base
+ RocksDatabase previousDefraggedDb = previousDefraggedStore.getDb();
+
+ // Process each column family
+ for (String cfName : COLUMN_FAMILIES_TO_TRACK_IN_SNAPSHOT) {
+ RocksDatabase.ColumnFamily currentCf =
currentSnapshotDb.getColumnFamily(cfName);
+ RocksDatabase.ColumnFamily previousCf =
previousDefraggedDb.getColumnFamily(cfName);
+ RocksDatabase.ColumnFamily targetCf =
targetDb.getColumnFamily(cfName);
+
+ if (currentCf == null || previousCf == null || targetCf == null) {
+ LOG.warn("Column family {} not found in one of the databases,
skipping incremental changes", cfName);
+ continue;
+ }
+
+ Table<byte[], byte[]> targetTable = targetRdbStore.getTable(cfName);
+ if (targetTable == null) {
+ LOG.warn("Table {} not found in target store, skipping", cfName);
+ continue;
+ }
+
+ // Get delta files for this column family
+ List<String> tablesToLookUp = Collections.singletonList(cfName);
+ Set<String> deltaFiles;
+ try {
+ deltaFiles = diffManager.getDeltaFiles(
+ previousOmSnapshot, currentOmSnapshot,
+ tablesToLookUp,
+ previousSnapshot, currentSnapshot,
+ false, // useFullDiff = false
+ tablePrefixes,
+ diffDir,
+ "defrag-" + currentSnapshot.getSnapshotId() // jobKey
+ );
Review Comment:
Access of [element](1) annotated with VisibleForTesting found in production
code.
##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SnapshotDefragService.java:
##########
@@ -166,7 +340,484 @@ private void
performIncrementalDefragmentation(SnapshotInfo currentSnapshot,
SnapshotInfo previousDefraggedSnapshot, OmSnapshot currentOmSnapshot)
throws IOException {
- // TODO: Implement incremental defragmentation
+ String currentSnapshotPath = OmSnapshotManager.getSnapshotPath(
+ ozoneManager.getConfiguration(), currentSnapshot);
+ String previousSnapshotPath = OmSnapshotManager.getSnapshotPath(
+ ozoneManager.getConfiguration(), previousDefraggedSnapshot);
+
+ // Fix path construction similar to performFullDefragmentation
+ String previousParentDir =
Paths.get(previousSnapshotPath).getParent().getParent().getParent().toString();
+ String previousCheckpointDirName =
Paths.get(previousSnapshotPath).getFileName().toString();
+ // TODO: Append version number
+ String previousDefraggedDbPath = Paths.get(previousParentDir,
CHECKPOINT_STATE_DEFRAGGED_DIR,
+ previousCheckpointDirName).toString();
+
+ String currentParentDir =
Paths.get(currentSnapshotPath).getParent().getParent().getParent().toString();
+ String currentCheckpointDirName =
Paths.get(currentSnapshotPath).getFileName().toString();
+ // TODO: Append version number as well. e.g.
om.db-fef74426-d01b-4c67-b20b-7750376c17dd-v1
+ String currentDefraggedDbPath = Paths.get(currentParentDir,
CHECKPOINT_STATE_DEFRAGGED_DIR,
+ currentCheckpointDirName).toString();
+
+ LOG.info("Starting incremental defragmentation for snapshot: {} using
previous defragged snapshot: {}",
+ currentSnapshot.getName(), previousDefraggedSnapshot.getName());
+ LOG.info("Previous defragmented DB: {}", previousDefraggedDbPath);
+ LOG.info("Current target DB: {}", currentDefraggedDbPath);
+
+ // Note: Don't create target directory - RocksDB createCheckpoint() will
create it
+ // and will fail with "Directory exists" if we create it first
+
+ try {
+ // Check if previous defragmented DB exists
+ if (!Files.exists(Paths.get(previousDefraggedDbPath))) {
+ // TODO: Should err and quit instead of falling back to full defrag
+ LOG.warn("Previous defragmented DB not found at {}, falling back to
full defragmentation",
+ previousDefraggedDbPath);
+ performFullDefragmentation(currentSnapshot, currentOmSnapshot);
+ return;
+ }
+
+ // Create a checkpoint from the previous defragmented DB directly at
target location
+ LOG.info("Creating checkpoint from previous defragmented DB directly to
target location");
+
+ final OmSnapshotManager snapshotManager =
ozoneManager.getOmSnapshotManager();
+ try (UncheckedAutoCloseableSupplier<OmSnapshot> prevSnapshotSupplier =
+
snapshotManager.getSnapshot(previousDefraggedSnapshot.getSnapshotId())) {
+ LOG.info("Opened previous (defragged) snapshot: {}",
previousDefraggedSnapshot.getName());
+ OmSnapshot prevSnapshot = prevSnapshotSupplier.get();
+ // Sanity check: Ensure previous snapshot is marked as defragmented
+ if (needsDefragmentation(previousDefraggedSnapshot)) {
+ LOG.error("Previous snapshot {} is not marked as defragmented.
Something is wrong.",
+ previousDefraggedSnapshot.getName());
+ return;
+ }
+ RDBStore prevStore = (RDBStore)
prevSnapshot.getMetadataManager().getStore();
+ RocksDatabase prevDb = prevStore.getDb();
+ assert !prevDb.isClosed();
+
+ try (RocksDatabase.RocksCheckpoint checkpoint =
prevDb.createCheckpoint()) {
+ checkpoint.createCheckpoint(Paths.get(currentDefraggedDbPath));
+ LOG.info("Created checkpoint at: {}", currentDefraggedDbPath);
+ }
+ }
+
+ final String volumeName = currentSnapshot.getVolumeName();
+ final String bucketName = currentSnapshot.getBucketName();
+ OzoneConfiguration conf = ozoneManager.getConfiguration();
+ final int maxOpenSstFilesInSnapshotDb = conf.getInt(
+ OZONE_OM_SNAPSHOT_DB_MAX_OPEN_FILES,
OZONE_OM_SNAPSHOT_DB_MAX_OPEN_FILES_DEFAULT);
+
+ final String currSnapshotDirName =
currentCheckpointDirName.substring(OM_DB_NAME.length());
+ try (OMMetadataManager currDefragDbMetadataManager = new
OmMetadataManagerImpl(
+ conf, currSnapshotDirName, maxOpenSstFilesInSnapshotDb, true)) {
+ LOG.info("Opened OMMetadataManager for checkpoint DB for incremental
update");
+ try (RDBStore currentDefraggedStore = (RDBStore)
currDefragDbMetadataManager.getStore()) {
+ try (RocksDatabase currentDefraggedDb =
currentDefraggedStore.getDb()) {
+ LOG.info("Opened checkpoint as working defragmented DB for
incremental update");
+
+ // Apply incremental changes from current snapshot
+ RDBStore currentSnapshotStore = (RDBStore)
currentOmSnapshot.getMetadataManager().getStore();
+ RocksDatabase currentSnapshotDb = currentSnapshotStore.getDb();
+
+ long incrementalKeysCopied =
applyIncrementalChanges(currentSnapshotDb, currentDefraggedStore,
+ currentSnapshot, previousDefraggedSnapshot);
+
+ LOG.info("Applied {} incremental changes for snapshot: {}",
+ incrementalKeysCopied, currentSnapshot.getName());
+
+ // Verify defrag DB integrity. TODO: Abort and stop defrag service
if verification fails?
+ verifyDbIntegrity(currentSnapshotDb, currentDefraggedDb,
currentSnapshot);
+
+ // Create a new version in YAML metadata, which also indicates
that the defragmentation is complete
+ updateSnapshotMetadataAfterDefrag(currentDefraggedStore,
currentSnapshot);
+
+ LOG.info("Successfully completed incremental defragmentation for
snapshot: {} with {} incremental changes",
+ currentSnapshot.getName(), incrementalKeysCopied);
+ }
+ }
+ }
+
+ } catch (RocksDatabaseException e) {
+ LOG.error("RocksDB error during incremental defragmentation of snapshot:
{}",
+ currentSnapshot.getName(), e);
+// LOG.warn("Falling back to full defragmentation due to error");
+// performFullDefragmentation(currentSnapshot, currentOmSnapshot);
Review Comment:
Commented-out code for fallback to full defragmentation after RocksDB error
should be removed. Either implement the fallback logic consistently with the
catch block at line 447-448, or remove the commented lines to avoid confusion.
```suggestion
```
##########
hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java:
##########
@@ -143,6 +146,12 @@ public class RDBStore implements DBStore {
Paths.get(dbLocation.getParent(), OM_SNAPSHOT_CHECKPOINT_DIR);
snapshotsParentDir = snapshotsParentDirPath.toString();
Files.createDirectories(snapshotsParentDirPath);
+
+ // TODO: Put this behind a feature flag
Review Comment:
Creating the defraggedSnapshotsParentDir directory unconditionally may not
be desired in all environments. As the TODO indicates, this should be behind a
feature flag or configuration option to allow administrators to disable
defragmentation if needed.
##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SnapshotDefragService.java:
##########
@@ -149,14 +175,162 @@ private boolean needsDefragmentation(SnapshotInfo
snapshotInfo) {
}
}
+ /**
+ * Returns a lexicographically higher string by appending a byte with
maximum value.
+ * For example, "/" -> "/\xFF"
+ */
+ private String getLexicographicallyHigherString(String str) {
+ return str + "\uFFFF";
+ }
+
+ /**
+ * Deletes unwanted key ranges from a column family.
+ * Deletes ranges: ["", prefix) and [lexHigher(prefix), lexHigher("/")]
+ */
+ private void deleteUnwantedRanges(RocksDatabase db,
RocksDatabase.ColumnFamily cf,
+ String prefix, String cfName) throws RocksDatabaseException {
+ if (cf == null) {
+ LOG.warn("Column family {} not found, skipping range deletion", cfName);
+ return;
+ }
+
+ try (ManagedWriteBatch writeBatch = new ManagedWriteBatch()) {
+ // Delete range ["", prefix)
+ byte[] emptyKey = "".getBytes(StandardCharsets.UTF_8);
+ byte[] prefixBytes = prefix.getBytes(StandardCharsets.UTF_8);
+ cf.batchDeleteRange(writeBatch, emptyKey, prefixBytes);
+
+ // Delete range [lexicographicalHigherString(prefix),
lexicographicalHigherString("/")]
+ String highPrefixStr = getLexicographicallyHigherString(prefix);
+ byte[] highPrefix = highPrefixStr.getBytes(StandardCharsets.UTF_8);
+ byte[] highSlash =
getLexicographicallyHigherString("/").getBytes(StandardCharsets.UTF_8);
+ cf.batchDeleteRange(writeBatch, highPrefix, highSlash);
+
+ db.batchWrite(writeBatch);
+ LOG.info("Deleted unwanted ranges from {}", cfName);
+ }
+ }
+
+ /**
+ * Processes the checkpoint DB: deletes unwanted ranges and compacts.
+ */
+ private void processCheckpointDb(String defraggedDbPath, String
checkpointDirName,
+ RocksDatabase originalDb, SnapshotInfo snapshotInfo) {
+ // Step 2: Create checkpoint MetadataManager after taking a checkpoint
+ LOG.info("Step 2: Opening checkpoint DB for defragmentation.
checkpointDirName = {}", checkpointDirName);
+
+ final String volumeName = snapshotInfo.getVolumeName();
+ final String bucketName = snapshotInfo.getBucketName();
+ OzoneConfiguration conf = ozoneManager.getConfiguration();
+ final int maxOpenSstFilesInSnapshotDb = conf.getInt(
+ OZONE_OM_SNAPSHOT_DB_MAX_OPEN_FILES,
OZONE_OM_SNAPSHOT_DB_MAX_OPEN_FILES_DEFAULT);
+
+ final String snapshotDirName =
checkpointDirName.substring(OM_DB_NAME.length());
+ try (OMMetadataManager defragDbMetadataManager = new OmMetadataManagerImpl(
+ conf, snapshotDirName, maxOpenSstFilesInSnapshotDb, true)) {
+ LOG.info("Opened checkpoint DB for defragmentation");
+ try (RDBStore rdbStore = (RDBStore) defragDbMetadataManager.getStore()) {
+ try (RocksDatabase checkpointDb = rdbStore.getDb()) {
+
+ // Step 3-5: DeleteRange from tables
+ final String obsPrefix =
defragDbMetadataManager.getOzoneKey(volumeName, bucketName, OM_KEY_PREFIX);
+ // TODO: Double check OBS prefix calculation
+ LOG.info("Step 3: Deleting unwanted ranges from KeyTable. obsPrefix
= {}", obsPrefix);
+ deleteUnwantedRanges(checkpointDb,
checkpointDb.getColumnFamily(KEY_TABLE),
+ obsPrefix, KEY_TABLE);
+
+ final String fsoPrefix =
defragDbMetadataManager.getOzoneKeyFSO(volumeName, bucketName, OM_KEY_PREFIX);
+ // TODO: Double check FSO prefix calculation
+ LOG.info("Step 4: Deleting unwanted ranges from DirectoryTable.
fsoPrefix = {}", fsoPrefix);
+ deleteUnwantedRanges(checkpointDb,
checkpointDb.getColumnFamily(DIRECTORY_TABLE),
+ fsoPrefix, DIRECTORY_TABLE);
+
+ LOG.info("Step 5: Deleting unwanted ranges from FileTable. fsoPrefix
= {}", fsoPrefix);
+ deleteUnwantedRanges(checkpointDb,
checkpointDb.getColumnFamily(FILE_TABLE),
+ fsoPrefix, FILE_TABLE);
+
+ // Do we need to drop other tables here as well?
+
+ // Step 6: Force compact all tables in the checkpoint
+ LOG.info("Step 6: Force compacting all tables in checkpoint DB");
+ try (ManagedCompactRangeOptions compactOptions = new
ManagedCompactRangeOptions()) {
+ compactOptions.setChangeLevel(true);
+ compactOptions.setBottommostLevelCompaction(
+ ManagedCompactRangeOptions.BottommostLevelCompaction.kForce);
+ checkpointDb.compactDB(compactOptions);
+ }
+ LOG.info("Completed force compaction of all tables");
+
+ // Verify defrag DB integrity
+ verifyDbIntegrity(originalDb, checkpointDb, snapshotInfo);
+
+ // Update snapshot metadata to mark defragmentation as complete
+ updateSnapshotMetadataAfterDefrag(rdbStore, snapshotInfo);
+ }
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
/**
* Performs full defragmentation for the first snapshot in the chain.
- * This is a simplified implementation that demonstrates the concept.
+ * Steps:
+ * 1. Take checkpoint of current DB
+ * 2. Create checkpoint MetadataManager after taking a checkpoint
+ * 3. DeleteRange KeyTable from ["", obsPrefix) +
+ * [lexicographicalHigherString(keyTablePrefix),
lexicographicalHigherString("/")]
+ * 4. DeleteRange DirectoryTable from ["", fsoPrefix) +
+ * [lexicographicalHigherString(fsoPrefix),
lexicographicalHigherString("/")]
+ * 5. DeleteRange FileTable from ["", fsoPrefix) +
+ * [lexicographicalHigherString(fsoPrefix),
lexicographicalHigherString("/")]
+ * 6. Force compact all tables in the checkpoint.
*/
private void performFullDefragmentation(SnapshotInfo snapshotInfo,
OmSnapshot omSnapshot) throws IOException {
- // TODO: Implement full defragmentation
+ String snapshotPath = OmSnapshotManager.getSnapshotPath(
+ ozoneManager.getConfiguration(), snapshotInfo);
+
+ // For defraggedDbPath, we need to go up to the parent directory and use
checkpointStateDefragged
+ String parentDir =
Paths.get(snapshotPath).getParent().getParent().getParent().toString();
+ String checkpointDirName =
Paths.get(snapshotPath).getFileName().toString();
+ // TODO: Append version number to defraggedDbPath after defrag. e.g.
om.db-fef74426-d01b-4c67-b20b-7750376c17dd-v1
Review Comment:
Version numbering for defragmented snapshots is mentioned in multiple TODOs
(lines 298, 351, 357). This functionality should be implemented before merging,
as it's critical for tracking snapshot versions and preventing conflicts when
multiple defrag operations occur.
##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SnapshotDefragService.java:
##########
@@ -166,7 +340,484 @@ private void
performIncrementalDefragmentation(SnapshotInfo currentSnapshot,
SnapshotInfo previousDefraggedSnapshot, OmSnapshot currentOmSnapshot)
throws IOException {
- // TODO: Implement incremental defragmentation
+ String currentSnapshotPath = OmSnapshotManager.getSnapshotPath(
+ ozoneManager.getConfiguration(), currentSnapshot);
+ String previousSnapshotPath = OmSnapshotManager.getSnapshotPath(
+ ozoneManager.getConfiguration(), previousDefraggedSnapshot);
+
+ // Fix path construction similar to performFullDefragmentation
+ String previousParentDir =
Paths.get(previousSnapshotPath).getParent().getParent().getParent().toString();
+ String previousCheckpointDirName =
Paths.get(previousSnapshotPath).getFileName().toString();
+ // TODO: Append version number
+ String previousDefraggedDbPath = Paths.get(previousParentDir,
CHECKPOINT_STATE_DEFRAGGED_DIR,
+ previousCheckpointDirName).toString();
+
+ String currentParentDir =
Paths.get(currentSnapshotPath).getParent().getParent().getParent().toString();
+ String currentCheckpointDirName =
Paths.get(currentSnapshotPath).getFileName().toString();
+ // TODO: Append version number as well. e.g.
om.db-fef74426-d01b-4c67-b20b-7750376c17dd-v1
+ String currentDefraggedDbPath = Paths.get(currentParentDir,
CHECKPOINT_STATE_DEFRAGGED_DIR,
+ currentCheckpointDirName).toString();
+
+ LOG.info("Starting incremental defragmentation for snapshot: {} using
previous defragged snapshot: {}",
+ currentSnapshot.getName(), previousDefraggedSnapshot.getName());
+ LOG.info("Previous defragmented DB: {}", previousDefraggedDbPath);
+ LOG.info("Current target DB: {}", currentDefraggedDbPath);
+
+ // Note: Don't create target directory - RocksDB createCheckpoint() will
create it
+ // and will fail with "Directory exists" if we create it first
+
+ try {
+ // Check if previous defragmented DB exists
+ if (!Files.exists(Paths.get(previousDefraggedDbPath))) {
+ // TODO: Should err and quit instead of falling back to full defrag
+ LOG.warn("Previous defragmented DB not found at {}, falling back to
full defragmentation",
+ previousDefraggedDbPath);
+ performFullDefragmentation(currentSnapshot, currentOmSnapshot);
+ return;
+ }
+
+ // Create a checkpoint from the previous defragmented DB directly at
target location
+ LOG.info("Creating checkpoint from previous defragmented DB directly to
target location");
+
+ final OmSnapshotManager snapshotManager =
ozoneManager.getOmSnapshotManager();
+ try (UncheckedAutoCloseableSupplier<OmSnapshot> prevSnapshotSupplier =
+
snapshotManager.getSnapshot(previousDefraggedSnapshot.getSnapshotId())) {
+ LOG.info("Opened previous (defragged) snapshot: {}",
previousDefraggedSnapshot.getName());
+ OmSnapshot prevSnapshot = prevSnapshotSupplier.get();
+ // Sanity check: Ensure previous snapshot is marked as defragmented
+ if (needsDefragmentation(previousDefraggedSnapshot)) {
+ LOG.error("Previous snapshot {} is not marked as defragmented.
Something is wrong.",
+ previousDefraggedSnapshot.getName());
+ return;
+ }
+ RDBStore prevStore = (RDBStore)
prevSnapshot.getMetadataManager().getStore();
+ RocksDatabase prevDb = prevStore.getDb();
+ assert !prevDb.isClosed();
+
+ try (RocksDatabase.RocksCheckpoint checkpoint =
prevDb.createCheckpoint()) {
+ checkpoint.createCheckpoint(Paths.get(currentDefraggedDbPath));
+ LOG.info("Created checkpoint at: {}", currentDefraggedDbPath);
+ }
+ }
+
+ final String volumeName = currentSnapshot.getVolumeName();
+ final String bucketName = currentSnapshot.getBucketName();
+ OzoneConfiguration conf = ozoneManager.getConfiguration();
+ final int maxOpenSstFilesInSnapshotDb = conf.getInt(
+ OZONE_OM_SNAPSHOT_DB_MAX_OPEN_FILES,
OZONE_OM_SNAPSHOT_DB_MAX_OPEN_FILES_DEFAULT);
+
+ final String currSnapshotDirName =
currentCheckpointDirName.substring(OM_DB_NAME.length());
+ try (OMMetadataManager currDefragDbMetadataManager = new
OmMetadataManagerImpl(
+ conf, currSnapshotDirName, maxOpenSstFilesInSnapshotDb, true)) {
+ LOG.info("Opened OMMetadataManager for checkpoint DB for incremental
update");
+ try (RDBStore currentDefraggedStore = (RDBStore)
currDefragDbMetadataManager.getStore()) {
+ try (RocksDatabase currentDefraggedDb =
currentDefraggedStore.getDb()) {
+ LOG.info("Opened checkpoint as working defragmented DB for
incremental update");
+
+ // Apply incremental changes from current snapshot
+ RDBStore currentSnapshotStore = (RDBStore)
currentOmSnapshot.getMetadataManager().getStore();
+ RocksDatabase currentSnapshotDb = currentSnapshotStore.getDb();
+
+ long incrementalKeysCopied =
applyIncrementalChanges(currentSnapshotDb, currentDefraggedStore,
+ currentSnapshot, previousDefraggedSnapshot);
+
+ LOG.info("Applied {} incremental changes for snapshot: {}",
+ incrementalKeysCopied, currentSnapshot.getName());
+
+ // Verify defrag DB integrity. TODO: Abort and stop defrag service
if verification fails?
+ verifyDbIntegrity(currentSnapshotDb, currentDefraggedDb,
currentSnapshot);
+
+ // Create a new version in YAML metadata, which also indicates
that the defragmentation is complete
+ updateSnapshotMetadataAfterDefrag(currentDefraggedStore,
currentSnapshot);
+
+ LOG.info("Successfully completed incremental defragmentation for
snapshot: {} with {} incremental changes",
+ currentSnapshot.getName(), incrementalKeysCopied);
+ }
+ }
+ }
+
+ } catch (RocksDatabaseException e) {
+ LOG.error("RocksDB error during incremental defragmentation of snapshot:
{}",
+ currentSnapshot.getName(), e);
+// LOG.warn("Falling back to full defragmentation due to error");
+// performFullDefragmentation(currentSnapshot, currentOmSnapshot);
+ } catch (Exception e) {
+ LOG.error("Unexpected error during incremental defragmentation of
snapshot: {}",
+ currentSnapshot.getName(), e);
+ LOG.warn("Falling back to full defragmentation due to error");
+ performFullDefragmentation(currentSnapshot, currentOmSnapshot);
+ }
+ }
+
+ /**
+ * Applies incremental changes by using SnapshotDiffManager#getDeltaFiles()
to get SST files,
+ * then iterating through keys using SstFileSetReader with byte-level
comparison.
+ * Uses RDBSstFileWriter to directly write changes to SST files and then
ingests them.
+ */
+ @SuppressWarnings("checkstyle:MethodLength")
+ private long applyIncrementalChanges(RocksDatabase currentSnapshotDb,
DBStore targetStore,
+ SnapshotInfo currentSnapshot, SnapshotInfo previousSnapshot) throws
IOException {
+
+ LOG.info("Applying incremental changes for snapshot: {} since: {} using
delta files approach",
+ currentSnapshot.getName(), previousSnapshot.getName());
+
+ long totalChanges = 0;
+
+ // Create temporary directory for SST files
+ String currentSnapshotPath = OmSnapshotManager.getSnapshotPath(
+ ozoneManager.getConfiguration(), currentSnapshot);
+ String parentDir =
Paths.get(currentSnapshotPath).getParent().getParent().toString();
+ String tempSstDir = Paths.get(parentDir, CHECKPOINT_STATE_DEFRAGGED_DIR,
TEMP_DIFF_DIR).toString();
+ String diffDir = Paths.get(parentDir, CHECKPOINT_STATE_DEFRAGGED_DIR,
"deltaFilesDiff-" +
+ UUID.randomUUID()).toString();
+
+ try {
+ Files.createDirectories(Paths.get(tempSstDir));
+ LOG.info("Created temporary SST directory: {}", tempSstDir);
+ Files.createDirectories(Paths.get(diffDir));
+ LOG.info("Created diff directory: {}", diffDir);
+
+ // Get OmSnapshotManager
+ OmSnapshotManager snapshotManager = ozoneManager.getOmSnapshotManager();
+
+ // Get OmSnapshot instances for previous and current snapshots
+ try (UncheckedAutoCloseableSupplier<OmSnapshot> previousSnapshotSupplier
=
+ snapshotManager.getSnapshot(previousSnapshot.getSnapshotId());
+ UncheckedAutoCloseableSupplier<OmSnapshot> currentSnapshotSupplier =
+ snapshotManager.getSnapshot(currentSnapshot.getSnapshotId())) {
+
+ OmSnapshot previousOmSnapshot = previousSnapshotSupplier.get();
+ OmSnapshot currentOmSnapshot = currentSnapshotSupplier.get();
+
+ // Get the SnapshotDiffManager
+ SnapshotDiffManager diffManager =
snapshotManager.getSnapshotDiffManager();
+
+ // Get column family to key prefix map for filtering SST files
+ Map<String, String> tablePrefixes = getColumnFamilyToKeyPrefixMap(
+ ozoneManager.getMetadataManager(),
+ currentSnapshot.getVolumeName(),
+ currentSnapshot.getBucketName());
+
+ // Get table references for target database
+ RDBStore targetRdbStore = (RDBStore) targetStore;
+ RocksDatabase targetDb = targetRdbStore.getDb();
+
+ RDBStore previousDefraggedStore = (RDBStore) targetStore; // The
previous defragged DB is our target base
+ RocksDatabase previousDefraggedDb = previousDefraggedStore.getDb();
+
+ // Process each column family
+ for (String cfName : COLUMN_FAMILIES_TO_TRACK_IN_SNAPSHOT) {
+ RocksDatabase.ColumnFamily currentCf =
currentSnapshotDb.getColumnFamily(cfName);
+ RocksDatabase.ColumnFamily previousCf =
previousDefraggedDb.getColumnFamily(cfName);
+ RocksDatabase.ColumnFamily targetCf =
targetDb.getColumnFamily(cfName);
+
+ if (currentCf == null || previousCf == null || targetCf == null) {
+ LOG.warn("Column family {} not found in one of the databases,
skipping incremental changes", cfName);
+ continue;
+ }
+
+ Table<byte[], byte[]> targetTable = targetRdbStore.getTable(cfName);
+ if (targetTable == null) {
+ LOG.warn("Table {} not found in target store, skipping", cfName);
+ continue;
+ }
+
+ // Get delta files for this column family
+ List<String> tablesToLookUp = Collections.singletonList(cfName);
+ Set<String> deltaFiles;
+ try {
+ deltaFiles = diffManager.getDeltaFiles(
+ previousOmSnapshot, currentOmSnapshot,
+ tablesToLookUp,
+ previousSnapshot, currentSnapshot,
+ false, // useFullDiff = false
+ tablePrefixes,
+ diffDir,
+ "defrag-" + currentSnapshot.getSnapshotId() // jobKey
+ );
+ LOG.info("Got {} delta SST files for column family: {}",
deltaFiles.size(), cfName);
+ } catch (Exception e) {
+ LOG.error("Failed to get delta files for column family {}: {}",
cfName, e.getMessage(), e);
+ continue;
+ }
+
+ if (deltaFiles.isEmpty()) {
+ LOG.info("No delta files for column family {}, skipping", cfName);
+ continue;
+ }
+
+ AtomicLong cfChanges = new AtomicLong(0);
+ String sstFileName = cfName + "_" + currentSnapshot.getSnapshotId()
+ ".sst";
+ File sstFile = new File(tempSstDir, sstFileName);
+
+ LOG.debug("Creating SST file for column family {} changes: {}",
cfName, sstFile.getAbsolutePath());
+
+ // Use SstFileSetReader to read keys from delta files
+ SstFileSetReader sstFileReader = new SstFileSetReader(deltaFiles);
+
+ try (RDBSstFileWriter sstWriter = new RDBSstFileWriter(sstFile)) {
+
+ // Get key stream with tombstones from delta files
+ try (Stream<String> keysToCheck =
sstFileReader.getKeyStreamWithTombstone(null, null)) {
+
+ keysToCheck.forEach(keyStr -> {
+ try {
+ byte[] key = keyStr.getBytes(StandardCharsets.UTF_8);
+
+ // Get values from previous defragmented snapshot and
current snapshot
+ byte[] previousValue = previousDefraggedDb.get(previousCf,
key);
+ byte[] currentValue = currentSnapshotDb.get(currentCf, key);
+
+ // Byte-level comparison: only write if values are different
+ if (!Arrays.equals(previousValue, currentValue)) {
+ if (currentValue != null) {
+ // Key exists in current snapshot - write the new value
+ sstWriter.put(key, currentValue);
+ } else {
+ // Key doesn't exist in current snapshot (deleted) -
write tombstone
+ sstWriter.delete(key);
+ }
+
+ // Increment change counter
+ cfChanges.getAndIncrement();
+ }
+ } catch (Exception e) {
+ LOG.error("Error processing key {} for column family {}: {}",
+ keyStr, cfName, e.getMessage(), e);
+ throw new RuntimeException(e);
+ }
+ });
+
+ } catch (RocksDBException e) {
+ LOG.error("RocksDB error reading keys from delta files for
column family {}: {}",
+ cfName, e.getMessage(), e);
+ }
+
+ LOG.debug("Finished writing {} changes for column family: {} to
SST file",
+ cfChanges.get(), cfName);
+
+ } catch (Exception e) {
+ LOG.error("Error processing column family {} for snapshot {}: {}",
+ cfName, currentSnapshot.getName(), e.getMessage(), e);
+ }
+
+ // Ingest SST file into target database if there were changes
+ if (sstFile.exists() && sstFile.length() > 0) {
+ try {
+ targetTable.loadFromFile(sstFile);
+ LOG.info("Successfully ingested SST file for column family {}:
{} changes",
+ cfName, cfChanges);
+ } catch (Exception e) {
+ LOG.error("Failed to ingest SST file for column family {}: {}",
cfName, e.getMessage(), e);
+ }
+ } else {
+ LOG.debug("No changes to ingest for column family {}", cfName);
+ }
+
+ // Clean up SST file after ingestion
+ try {
+ if (sstFile.exists()) {
+ Files.delete(sstFile.toPath());
+ LOG.debug("Cleaned up SST file: {}", sstFile.getAbsolutePath());
+ }
+ } catch (IOException e) {
+ LOG.warn("Failed to clean up SST file: {}",
sstFile.getAbsolutePath(), e);
+ }
+
+ totalChanges += cfChanges.get();
+ LOG.debug("Applied {} incremental changes for column family: {}",
cfChanges, cfName);
+ }
+ }
+
+ // Clean up temporary directories
+ try {
+ Files.deleteIfExists(Paths.get(tempSstDir));
Review Comment:
Using `Files.deleteIfExists()` on a directory will fail if the directory is
not empty. This should use recursive deletion similar to line 641 which uses
`FileUtils.deleteDirectory()`. The cleanup may silently fail, leaving temporary
SST files.
```suggestion
org.apache.commons.io.FileUtils.deleteDirectory(new
File(tempSstDir));
```
##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SnapshotDefragService.java:
##########
@@ -166,7 +340,484 @@ private void
performIncrementalDefragmentation(SnapshotInfo currentSnapshot,
SnapshotInfo previousDefraggedSnapshot, OmSnapshot currentOmSnapshot)
throws IOException {
- // TODO: Implement incremental defragmentation
+ String currentSnapshotPath = OmSnapshotManager.getSnapshotPath(
+ ozoneManager.getConfiguration(), currentSnapshot);
+ String previousSnapshotPath = OmSnapshotManager.getSnapshotPath(
+ ozoneManager.getConfiguration(), previousDefraggedSnapshot);
+
+ // Fix path construction similar to performFullDefragmentation
+ String previousParentDir =
Paths.get(previousSnapshotPath).getParent().getParent().getParent().toString();
+ String previousCheckpointDirName =
Paths.get(previousSnapshotPath).getFileName().toString();
+ // TODO: Append version number
+ String previousDefraggedDbPath = Paths.get(previousParentDir,
CHECKPOINT_STATE_DEFRAGGED_DIR,
+ previousCheckpointDirName).toString();
+
+ String currentParentDir =
Paths.get(currentSnapshotPath).getParent().getParent().getParent().toString();
+ String currentCheckpointDirName =
Paths.get(currentSnapshotPath).getFileName().toString();
+ // TODO: Append version number as well. e.g.
om.db-fef74426-d01b-4c67-b20b-7750376c17dd-v1
+ String currentDefraggedDbPath = Paths.get(currentParentDir,
CHECKPOINT_STATE_DEFRAGGED_DIR,
+ currentCheckpointDirName).toString();
+
+ LOG.info("Starting incremental defragmentation for snapshot: {} using
previous defragged snapshot: {}",
+ currentSnapshot.getName(), previousDefraggedSnapshot.getName());
+ LOG.info("Previous defragmented DB: {}", previousDefraggedDbPath);
+ LOG.info("Current target DB: {}", currentDefraggedDbPath);
+
+ // Note: Don't create target directory - RocksDB createCheckpoint() will
create it
+ // and will fail with "Directory exists" if we create it first
+
+ try {
+ // Check if previous defragmented DB exists
+ if (!Files.exists(Paths.get(previousDefraggedDbPath))) {
+ // TODO: Should err and quit instead of falling back to full defrag
+ LOG.warn("Previous defragmented DB not found at {}, falling back to
full defragmentation",
+ previousDefraggedDbPath);
+ performFullDefragmentation(currentSnapshot, currentOmSnapshot);
+ return;
Review Comment:
When the previous defragmented DB is not found during incremental
defragmentation, the code falls back to full defragmentation. This can mask
errors and lead to unexpected resource consumption. The code should throw an
exception instead, as indicated by the TODO comment, to fail fast and alert
operators to the issue.
```suggestion
// Fail fast: throw exception instead of falling back to full
defragmentation
String errorMsg = String.format("Previous defragmented DB not found
at %s for snapshot %s. Incremental defragmentation cannot proceed.",
previousDefraggedDbPath, previousDefraggedSnapshot.getName());
LOG.error(errorMsg);
throw new IOException(errorMsg);
```
##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SnapshotDefragService.java:
##########
@@ -166,7 +340,484 @@ private void
performIncrementalDefragmentation(SnapshotInfo currentSnapshot,
SnapshotInfo previousDefraggedSnapshot, OmSnapshot currentOmSnapshot)
throws IOException {
- // TODO: Implement incremental defragmentation
+ String currentSnapshotPath = OmSnapshotManager.getSnapshotPath(
+ ozoneManager.getConfiguration(), currentSnapshot);
+ String previousSnapshotPath = OmSnapshotManager.getSnapshotPath(
+ ozoneManager.getConfiguration(), previousDefraggedSnapshot);
+
+ // Fix path construction similar to performFullDefragmentation
+ String previousParentDir =
Paths.get(previousSnapshotPath).getParent().getParent().getParent().toString();
+ String previousCheckpointDirName =
Paths.get(previousSnapshotPath).getFileName().toString();
+ // TODO: Append version number
+ String previousDefraggedDbPath = Paths.get(previousParentDir,
CHECKPOINT_STATE_DEFRAGGED_DIR,
+ previousCheckpointDirName).toString();
+
+ String currentParentDir =
Paths.get(currentSnapshotPath).getParent().getParent().getParent().toString();
+ String currentCheckpointDirName =
Paths.get(currentSnapshotPath).getFileName().toString();
+ // TODO: Append version number as well. e.g.
om.db-fef74426-d01b-4c67-b20b-7750376c17dd-v1
+ String currentDefraggedDbPath = Paths.get(currentParentDir,
CHECKPOINT_STATE_DEFRAGGED_DIR,
+ currentCheckpointDirName).toString();
+
+ LOG.info("Starting incremental defragmentation for snapshot: {} using
previous defragged snapshot: {}",
+ currentSnapshot.getName(), previousDefraggedSnapshot.getName());
+ LOG.info("Previous defragmented DB: {}", previousDefraggedDbPath);
+ LOG.info("Current target DB: {}", currentDefraggedDbPath);
+
+ // Note: Don't create target directory - RocksDB createCheckpoint() will
create it
+ // and will fail with "Directory exists" if we create it first
+
+ try {
+ // Check if previous defragmented DB exists
+ if (!Files.exists(Paths.get(previousDefraggedDbPath))) {
+ // TODO: Should err and quit instead of falling back to full defrag
+ LOG.warn("Previous defragmented DB not found at {}, falling back to
full defragmentation",
+ previousDefraggedDbPath);
+ performFullDefragmentation(currentSnapshot, currentOmSnapshot);
+ return;
+ }
+
+ // Create a checkpoint from the previous defragmented DB directly at
target location
+ LOG.info("Creating checkpoint from previous defragmented DB directly to
target location");
+
+ final OmSnapshotManager snapshotManager =
ozoneManager.getOmSnapshotManager();
+ try (UncheckedAutoCloseableSupplier<OmSnapshot> prevSnapshotSupplier =
+
snapshotManager.getSnapshot(previousDefraggedSnapshot.getSnapshotId())) {
+ LOG.info("Opened previous (defragged) snapshot: {}",
previousDefraggedSnapshot.getName());
+ OmSnapshot prevSnapshot = prevSnapshotSupplier.get();
+ // Sanity check: Ensure previous snapshot is marked as defragmented
+ if (needsDefragmentation(previousDefraggedSnapshot)) {
+ LOG.error("Previous snapshot {} is not marked as defragmented.
Something is wrong.",
+ previousDefraggedSnapshot.getName());
+ return;
+ }
+ RDBStore prevStore = (RDBStore)
prevSnapshot.getMetadataManager().getStore();
+ RocksDatabase prevDb = prevStore.getDb();
+ assert !prevDb.isClosed();
+
+ try (RocksDatabase.RocksCheckpoint checkpoint =
prevDb.createCheckpoint()) {
+ checkpoint.createCheckpoint(Paths.get(currentDefraggedDbPath));
+ LOG.info("Created checkpoint at: {}", currentDefraggedDbPath);
+ }
+ }
+
+ final String volumeName = currentSnapshot.getVolumeName();
+ final String bucketName = currentSnapshot.getBucketName();
Review Comment:
Variable 'String bucketName' is never read.
```suggestion
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]