smengcl commented on code in PR #9227:
URL: https://github.com/apache/ozone/pull/9227#discussion_r2483187578
##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SnapshotDefragService.java:
##########
@@ -166,7 +340,484 @@ private void
performIncrementalDefragmentation(SnapshotInfo currentSnapshot,
SnapshotInfo previousDefraggedSnapshot, OmSnapshot currentOmSnapshot)
throws IOException {
- // TODO: Implement incremental defragmentation
+ String currentSnapshotPath = OmSnapshotManager.getSnapshotPath(
+ ozoneManager.getConfiguration(), currentSnapshot);
+ String previousSnapshotPath = OmSnapshotManager.getSnapshotPath(
+ ozoneManager.getConfiguration(), previousDefraggedSnapshot);
+
+ // Fix path construction similar to performFullDefragmentation
+ String previousParentDir =
Paths.get(previousSnapshotPath).getParent().getParent().getParent().toString();
+ String previousCheckpointDirName =
Paths.get(previousSnapshotPath).getFileName().toString();
+ // TODO: Append version number
+ String previousDefraggedDbPath = Paths.get(previousParentDir,
CHECKPOINT_STATE_DEFRAGGED_DIR,
+ previousCheckpointDirName).toString();
+
+ String currentParentDir =
Paths.get(currentSnapshotPath).getParent().getParent().getParent().toString();
+ String currentCheckpointDirName =
Paths.get(currentSnapshotPath).getFileName().toString();
+ // TODO: Append version number as well. e.g.
om.db-fef74426-d01b-4c67-b20b-7750376c17dd-v1
+ String currentDefraggedDbPath = Paths.get(currentParentDir,
CHECKPOINT_STATE_DEFRAGGED_DIR,
+ currentCheckpointDirName).toString();
+
+ LOG.info("Starting incremental defragmentation for snapshot: {} using
previous defragged snapshot: {}",
+ currentSnapshot.getName(), previousDefraggedSnapshot.getName());
+ LOG.info("Previous defragmented DB: {}", previousDefraggedDbPath);
+ LOG.info("Current target DB: {}", currentDefraggedDbPath);
+
+ // Note: Don't create target directory - RocksDB createCheckpoint() will
create it
+ // and will fail with "Directory exists" if we create it first
+
+ try {
+ // Check if previous defragmented DB exists
+ if (!Files.exists(Paths.get(previousDefraggedDbPath))) {
+ // TODO: Should err and quit instead of falling back to full defrag
+ LOG.warn("Previous defragmented DB not found at {}, falling back to
full defragmentation",
+ previousDefraggedDbPath);
+ performFullDefragmentation(currentSnapshot, currentOmSnapshot);
+ return;
+ }
+
+ // Create a checkpoint from the previous defragmented DB directly at
target location
+ LOG.info("Creating checkpoint from previous defragmented DB directly to
target location");
+
+ final OmSnapshotManager snapshotManager =
ozoneManager.getOmSnapshotManager();
+ try (UncheckedAutoCloseableSupplier<OmSnapshot> prevSnapshotSupplier =
+
snapshotManager.getSnapshot(previousDefraggedSnapshot.getSnapshotId())) {
+ LOG.info("Opened previous (defragged) snapshot: {}",
previousDefraggedSnapshot.getName());
+ OmSnapshot prevSnapshot = prevSnapshotSupplier.get();
+ // Sanity check: Ensure previous snapshot is marked as defragmented
+ if (needsDefragmentation(previousDefraggedSnapshot)) {
+ LOG.error("Previous snapshot {} is not marked as defragmented.
Something is wrong.",
+ previousDefraggedSnapshot.getName());
+ return;
+ }
+ RDBStore prevStore = (RDBStore)
prevSnapshot.getMetadataManager().getStore();
+ RocksDatabase prevDb = prevStore.getDb();
+ assert !prevDb.isClosed();
+
+ try (RocksDatabase.RocksCheckpoint checkpoint =
prevDb.createCheckpoint()) {
+ checkpoint.createCheckpoint(Paths.get(currentDefraggedDbPath));
+ LOG.info("Created checkpoint at: {}", currentDefraggedDbPath);
+ }
+ }
+
+ final String volumeName = currentSnapshot.getVolumeName();
+ final String bucketName = currentSnapshot.getBucketName();
+ OzoneConfiguration conf = ozoneManager.getConfiguration();
+ final int maxOpenSstFilesInSnapshotDb = conf.getInt(
+ OZONE_OM_SNAPSHOT_DB_MAX_OPEN_FILES,
OZONE_OM_SNAPSHOT_DB_MAX_OPEN_FILES_DEFAULT);
+
+ final String currSnapshotDirName =
currentCheckpointDirName.substring(OM_DB_NAME.length());
+ try (OMMetadataManager currDefragDbMetadataManager = new
OmMetadataManagerImpl(
+ conf, currSnapshotDirName, maxOpenSstFilesInSnapshotDb, true)) {
+ LOG.info("Opened OMMetadataManager for checkpoint DB for incremental
update");
+ try (RDBStore currentDefraggedStore = (RDBStore)
currDefragDbMetadataManager.getStore()) {
+ try (RocksDatabase currentDefraggedDb =
currentDefraggedStore.getDb()) {
+ LOG.info("Opened checkpoint as working defragmented DB for
incremental update");
+
+ // Apply incremental changes from current snapshot
+ RDBStore currentSnapshotStore = (RDBStore)
currentOmSnapshot.getMetadataManager().getStore();
+ RocksDatabase currentSnapshotDb = currentSnapshotStore.getDb();
+
+ long incrementalKeysCopied =
applyIncrementalChanges(currentSnapshotDb, currentDefraggedStore,
+ currentSnapshot, previousDefraggedSnapshot);
+
+ LOG.info("Applied {} incremental changes for snapshot: {}",
+ incrementalKeysCopied, currentSnapshot.getName());
+
+ // Verify defrag DB integrity. TODO: Abort and stop defrag service
if verification fails?
+ verifyDbIntegrity(currentSnapshotDb, currentDefraggedDb,
currentSnapshot);
+
+ // Create a new version in YAML metadata, which also indicates
that the defragmentation is complete
+ updateSnapshotMetadataAfterDefrag(currentDefraggedStore,
currentSnapshot);
+
+ LOG.info("Successfully completed incremental defragmentation for
snapshot: {} with {} incremental changes",
+ currentSnapshot.getName(), incrementalKeysCopied);
+ }
+ }
+ }
+
+ } catch (RocksDatabaseException e) {
+ LOG.error("RocksDB error during incremental defragmentation of snapshot:
{}",
+ currentSnapshot.getName(), e);
+// LOG.warn("Falling back to full defragmentation due to error");
+// performFullDefragmentation(currentSnapshot, currentOmSnapshot);
+ } catch (Exception e) {
+ LOG.error("Unexpected error during incremental defragmentation of
snapshot: {}",
+ currentSnapshot.getName(), e);
+ LOG.warn("Falling back to full defragmentation due to error");
+ performFullDefragmentation(currentSnapshot, currentOmSnapshot);
+ }
+ }
+
+ /**
+ * Applies incremental changes by using SnapshotDiffManager#getDeltaFiles()
to get SST files,
+ * then iterating through keys using SstFileSetReader with byte-level
comparison.
+ * Uses RDBSstFileWriter to directly write changes to SST files and then
ingests them.
+ */
+ @SuppressWarnings("checkstyle:MethodLength")
+ private long applyIncrementalChanges(RocksDatabase currentSnapshotDb,
DBStore targetStore,
+ SnapshotInfo currentSnapshot, SnapshotInfo previousSnapshot) throws
IOException {
+
+ LOG.info("Applying incremental changes for snapshot: {} since: {} using
delta files approach",
+ currentSnapshot.getName(), previousSnapshot.getName());
+
+ long totalChanges = 0;
+
+ // Create temporary directory for SST files
+ String currentSnapshotPath = OmSnapshotManager.getSnapshotPath(
+ ozoneManager.getConfiguration(), currentSnapshot);
+ String parentDir =
Paths.get(currentSnapshotPath).getParent().getParent().toString();
+ String tempSstDir = Paths.get(parentDir, CHECKPOINT_STATE_DEFRAGGED_DIR,
TEMP_DIFF_DIR).toString();
+ String diffDir = Paths.get(parentDir, CHECKPOINT_STATE_DEFRAGGED_DIR,
"deltaFilesDiff-" +
+ UUID.randomUUID()).toString();
+
+ try {
+ Files.createDirectories(Paths.get(tempSstDir));
+ LOG.info("Created temporary SST directory: {}", tempSstDir);
+ Files.createDirectories(Paths.get(diffDir));
+ LOG.info("Created diff directory: {}", diffDir);
+
+ // Get OmSnapshotManager
+ OmSnapshotManager snapshotManager = ozoneManager.getOmSnapshotManager();
+
+ // Get OmSnapshot instances for previous and current snapshots
+ try (UncheckedAutoCloseableSupplier<OmSnapshot> previousSnapshotSupplier
=
+ snapshotManager.getSnapshot(previousSnapshot.getSnapshotId());
+ UncheckedAutoCloseableSupplier<OmSnapshot> currentSnapshotSupplier =
+ snapshotManager.getSnapshot(currentSnapshot.getSnapshotId())) {
+
+ OmSnapshot previousOmSnapshot = previousSnapshotSupplier.get();
+ OmSnapshot currentOmSnapshot = currentSnapshotSupplier.get();
+
+ // Get the SnapshotDiffManager
+ SnapshotDiffManager diffManager =
snapshotManager.getSnapshotDiffManager();
+
+ // Get column family to key prefix map for filtering SST files
+ Map<String, String> tablePrefixes = getColumnFamilyToKeyPrefixMap(
+ ozoneManager.getMetadataManager(),
+ currentSnapshot.getVolumeName(),
+ currentSnapshot.getBucketName());
+
+ // Get table references for target database
+ RDBStore targetRdbStore = (RDBStore) targetStore;
+ RocksDatabase targetDb = targetRdbStore.getDb();
+
+ RDBStore previousDefraggedStore = (RDBStore) targetStore; // The
previous defragged DB is our target base
+ RocksDatabase previousDefraggedDb = previousDefraggedStore.getDb();
+
+ // Process each column family
+ for (String cfName : COLUMN_FAMILIES_TO_TRACK_IN_SNAPSHOT) {
+ RocksDatabase.ColumnFamily currentCf =
currentSnapshotDb.getColumnFamily(cfName);
+ RocksDatabase.ColumnFamily previousCf =
previousDefraggedDb.getColumnFamily(cfName);
+ RocksDatabase.ColumnFamily targetCf =
targetDb.getColumnFamily(cfName);
+
+ if (currentCf == null || previousCf == null || targetCf == null) {
+ LOG.warn("Column family {} not found in one of the databases,
skipping incremental changes", cfName);
+ continue;
+ }
+
+ Table<byte[], byte[]> targetTable = targetRdbStore.getTable(cfName);
+ if (targetTable == null) {
+ LOG.warn("Table {} not found in target store, skipping", cfName);
+ continue;
+ }
+
+ // Get delta files for this column family
+ List<String> tablesToLookUp = Collections.singletonList(cfName);
+ Set<String> deltaFiles;
+ try {
+ deltaFiles = diffManager.getDeltaFiles(
+ previousOmSnapshot, currentOmSnapshot,
+ tablesToLookUp,
+ previousSnapshot, currentSnapshot,
+ false, // useFullDiff = false
+ tablePrefixes,
+ diffDir,
+ "defrag-" + currentSnapshot.getSnapshotId() // jobKey
+ );
+ LOG.info("Got {} delta SST files for column family: {}",
deltaFiles.size(), cfName);
+ } catch (Exception e) {
+ LOG.error("Failed to get delta files for column family {}: {}",
cfName, e.getMessage(), e);
+ continue;
+ }
+
+ if (deltaFiles.isEmpty()) {
+ LOG.info("No delta files for column family {}, skipping", cfName);
+ continue;
+ }
+
+ AtomicLong cfChanges = new AtomicLong(0);
+ String sstFileName = cfName + "_" + currentSnapshot.getSnapshotId()
+ ".sst";
+ File sstFile = new File(tempSstDir, sstFileName);
+
+ LOG.debug("Creating SST file for column family {} changes: {}",
cfName, sstFile.getAbsolutePath());
+
+ // Use SstFileSetReader to read keys from delta files
+ SstFileSetReader sstFileReader = new SstFileSetReader(deltaFiles);
+
+ try (RDBSstFileWriter sstWriter = new RDBSstFileWriter(sstFile)) {
+
+ // Get key stream with tombstones from delta files
+ try (Stream<String> keysToCheck =
sstFileReader.getKeyStreamWithTombstone(null, null)) {
+
+ keysToCheck.forEach(keyStr -> {
+ try {
+ byte[] key = keyStr.getBytes(StandardCharsets.UTF_8);
+
+ // Get values from previous defragmented snapshot and
current snapshot
+ byte[] previousValue = previousDefraggedDb.get(previousCf,
key);
+ byte[] currentValue = currentSnapshotDb.get(currentCf, key);
+
+ // Byte-level comparison: only write if values are different
+ if (!Arrays.equals(previousValue, currentValue)) {
+ if (currentValue != null) {
+ // Key exists in current snapshot - write the new value
+ sstWriter.put(key, currentValue);
+ } else {
+ // Key doesn't exist in current snapshot (deleted) -
write tombstone
+ sstWriter.delete(key);
+ }
+
+ // Increment change counter
+ cfChanges.getAndIncrement();
+ }
+ } catch (Exception e) {
+ LOG.error("Error processing key {} for column family {}: {}",
+ keyStr, cfName, e.getMessage(), e);
+ throw new RuntimeException(e);
+ }
+ });
+
+ } catch (RocksDBException e) {
+ LOG.error("RocksDB error reading keys from delta files for
column family {}: {}",
+ cfName, e.getMessage(), e);
+ }
+
+ LOG.debug("Finished writing {} changes for column family: {} to
SST file",
+ cfChanges.get(), cfName);
+
+ } catch (Exception e) {
+ LOG.error("Error processing column family {} for snapshot {}: {}",
+ cfName, currentSnapshot.getName(), e.getMessage(), e);
+ }
+
+ // Ingest SST file into target database if there were changes
+ if (sstFile.exists() && sstFile.length() > 0) {
+ try {
+ targetTable.loadFromFile(sstFile);
+ LOG.info("Successfully ingested SST file for column family {}:
{} changes",
+ cfName, cfChanges);
+ } catch (Exception e) {
+ LOG.error("Failed to ingest SST file for column family {}: {}",
cfName, e.getMessage(), e);
+ }
+ } else {
+ LOG.debug("No changes to ingest for column family {}", cfName);
+ }
+
+ // Clean up SST file after ingestion
+ try {
+ if (sstFile.exists()) {
+ Files.delete(sstFile.toPath());
+ LOG.debug("Cleaned up SST file: {}", sstFile.getAbsolutePath());
+ }
+ } catch (IOException e) {
+ LOG.warn("Failed to clean up SST file: {}",
sstFile.getAbsolutePath(), e);
+ }
+
+ totalChanges += cfChanges.get();
+ LOG.debug("Applied {} incremental changes for column family: {}",
cfChanges, cfName);
+ }
+ }
+
+ // Clean up temporary directories
+ try {
+ Files.deleteIfExists(Paths.get(tempSstDir));
Review Comment:
done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]