swamirishi commented on code in PR #9324:
URL: https://github.com/apache/ozone/pull/9324#discussion_r2557533659


##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/defrag/SnapshotDefragService.java:
##########
@@ -135,48 +196,322 @@ private boolean isRocksToolsNativeLibAvailable() {
   }
 
   /**
-   * Checks if a snapshot needs defragmentation by examining its YAML metadata.
+   * Determines whether the specified snapshot requires defragmentation and 
returns
+   * a pair indicating the need for defragmentation and the corresponding 
version of the snapshot.
+   *
+   * @param snapshotInfo Information about the snapshot to be checked for 
defragmentation.
+   * @return A pair containing a boolean value and an integer:
+   *         - The boolean value indicates whether the snapshot requires 
defragmentation
+   *         (true if needed, false otherwise).
+   *         - The integer represents the version of the snapshot being 
evaluated.
+   * @throws IOException If an I/O error occurs while accessing the local 
snapshot data or metadata.
    */
-  private boolean needsDefragmentation(SnapshotInfo snapshotInfo) {
-    if (!SstFilteringService.isSstFiltered(conf, snapshotInfo)) {
-      return false;
-    }
-    try (OmSnapshotLocalDataManager.ReadableOmSnapshotLocalDataProvider 
readableOmSnapshotLocalDataProvider =
-             
ozoneManager.getOmSnapshotManager().getSnapshotLocalDataManager().getOmSnapshotLocalData(snapshotInfo))
 {
-      Path snapshotPath = OmSnapshotManager.getSnapshotPath(
-          ozoneManager.getMetadataManager(), snapshotInfo,
-          
readableOmSnapshotLocalDataProvider.getSnapshotLocalData().getVersion());
+  private Pair<Boolean, Integer> needsDefragmentation(SnapshotInfo 
snapshotInfo) throws IOException {
+    // Update snapshot local metadata to point to the correct previous 
snapshotId if it was different and check if
+    // snapshot needs defrag.
+    try (WritableOmSnapshotLocalDataProvider 
writableOmSnapshotLocalDataProvider =
+             
snapshotLocalDataManager.getWritableOmSnapshotLocalData(snapshotInfo)) {
       // Read snapshot local metadata from YAML
       // Check if snapshot needs compaction (defragmentation)
-      boolean needsDefrag = readableOmSnapshotLocalDataProvider.needsDefrag();
-      LOG.debug("Snapshot {} needsDefragmentation field value: {}", 
snapshotInfo.getName(), needsDefrag);
+      writableOmSnapshotLocalDataProvider.commit();
+      boolean needsDefrag = writableOmSnapshotLocalDataProvider.needsDefrag();
+      OmSnapshotLocalData localData = 
writableOmSnapshotLocalDataProvider.getSnapshotLocalData();
+      if (!needsDefrag) {
+        OmSnapshotLocalData previousLocalData = 
writableOmSnapshotLocalDataProvider.getPreviousSnapshotLocalData();
+        LOG.debug("Skipping defragmentation since snapshot has already been 
defragmented: id : {}(version: {}=>{}) " +
+                "previousId: {}(version: {})", snapshotInfo.getSnapshotId(), 
localData.getVersion(),
+            
localData.getVersionSstFileInfos().get(localData.getVersion()).getPreviousSnapshotVersion(),
+            snapshotInfo.getPathPreviousSnapshotId(), 
previousLocalData.getVersion());
+      } else {
+        LOG.debug("Snapshot {} needsDefragmentation field value: true", 
snapshotInfo.getSnapshotId());
+      }
+      return Pair.of(needsDefrag, localData.getVersion());
+    }
+  }
 
-      return needsDefrag;
-    } catch (IOException e) {
-      LOG.warn("Failed to read YAML metadata for snapshot {}, assuming defrag 
needed",
-          snapshotInfo.getName(), e);
-      return true;
+  private Pair<String, String> getTableBounds(Table<String, ?> table) throws 
RocksDatabaseException, CodecException {
+    String tableLowestValue = null, tableHighestValue = null;
+    try (TableIterator<String, String> keyIterator = table.keyIterator()) {
+      if (keyIterator.hasNext()) {
+        // Setting the lowest value to the first key in the table.
+        tableLowestValue = keyIterator.next();
+      }
+      keyIterator.seekToLast();
+      if (keyIterator.hasNext()) {
+        // Setting the highest value to the last key in the table.
+        tableHighestValue = keyIterator.next();
+      }
     }
+    return Pair.of(tableLowestValue, tableHighestValue);
   }
 
   /**
-   * Performs full defragmentation for the first snapshot in the chain.
-   * This is a simplified implementation that demonstrates the concept.
+   * Performs a full defragmentation process for specified tables in the 
metadata manager.
+   * This method processes all the entries in the tables for the provided 
prefix information,
+   * deletes specified key ranges, and compacts the tables to remove 
tombstones.
+   *
+   * @param checkpointDBStore the metadata manager responsible for managing 
tables during the checkpoint process
+   * @param prefixInfo the prefix information used to identify bucket prefix 
and determine key ranges in the tables
+   * @param incrementalTables the set of tables for which incremental 
defragmentation is performed.
+   * @throws IOException if an I/O error occurs during table operations or 
compaction
    */
-  private void performFullDefragmentation(SnapshotInfo snapshotInfo,
-      OmSnapshot omSnapshot) throws IOException {
-
-    // TODO: Implement full defragmentation
+  private void performFullDefragmentation(DBStore checkpointDBStore, 
TablePrefixInfo prefixInfo,
+      Set<String> incrementalTables) throws IOException {
+    for (String table : incrementalTables) {
+      Table<String, byte[]> checkpointTable = 
checkpointDBStore.getTable(table, StringCodec.get(),
+          ByteArrayCodec.get());
+      String tableBucketPrefix = prefixInfo.getTablePrefix(table);
+      String prefixUpperBound = 
getLexicographicallyHigherString(tableBucketPrefix);
+
+      Pair<String, String> tableBounds = getTableBounds(checkpointTable);
+      String tableLowestValue = tableBounds.getLeft();
+      String tableHighestValue = tableBounds.getRight();
+
+      // If lowest value is not null and if the bucket prefix corresponding to 
the table is greater than lower then
+      // delete the range between lowest value and bucket prefix.
+      if (tableLowestValue != null && 
tableLowestValue.compareTo(tableBucketPrefix) < 0) {
+        checkpointTable.deleteRange(tableLowestValue, tableBucketPrefix);
+      }
+      // If highest value is not null and if the next higher lexicographical 
string of bucket prefix corresponding to
+      // the table is less than equal to the highest value then delete the 
range between bucket prefix
+      // and also the highest value.
+      if (tableHighestValue != null && 
tableHighestValue.compareTo(prefixUpperBound) >= 0) {
+        checkpointTable.deleteRange(prefixUpperBound, tableHighestValue);
+        checkpointTable.delete(tableHighestValue);
+      }
+      // Compact the table completely with kForce to get rid of tombstones.
+      try (ManagedCompactRangeOptions compactRangeOptions = new 
ManagedCompactRangeOptions()) {
+        
compactRangeOptions.setBottommostLevelCompaction(ManagedCompactRangeOptions.BottommostLevelCompaction.kForce);
+        compactRangeOptions.setExclusiveManualCompaction(true);
+        checkpointDBStore.compactTable(table, compactRangeOptions);
+      }
+    }
   }
 
   /**
-   * Performs incremental defragmentation using diff from previous 
defragmented snapshot.
+   * Performs an incremental defragmentation process, which involves 
determining
+   * and processing delta files between snapshots for metadata updates. The 
method
+   * computes the changes, manages file ingestion to the checkpoint metadata 
manager,
+   * and ensures that all delta files are deleted after processing.
+   *
+   * @param previousSnapshotInfo information about the previous snapshot.
+   * @param snapshotInfo information about the current snapshot for which
+   *                     incremental defragmentation is performed.
+   * @param snapshotVersion the version of the snapshot to be processed.
+   * @param checkpointStore the dbStore instance where data
+   *                        updates are ingested after being processed.
+   * @param bucketPrefixInfo table prefix information associated with buckets,
+   *                         used to determine bounds for processing keys.
+   * @param incrementalTables the set of tables for which incremental 
defragmentation is performed.
+   * @throws IOException if an I/O error occurs during processing.
    */
-  private void performIncrementalDefragmentation(SnapshotInfo currentSnapshot,
-      SnapshotInfo previousDefraggedSnapshot, OmSnapshot currentOmSnapshot)
+  private void performIncrementalDefragmentation(SnapshotInfo 
previousSnapshotInfo, SnapshotInfo snapshotInfo,
+      int snapshotVersion, DBStore checkpointStore, TablePrefixInfo 
bucketPrefixInfo, Set<String> incrementalTables)
       throws IOException {
+    // Map of delta files grouped on the basis of the tableName.
+    Collection<Pair<Path, SstFileInfo>> allTableDeltaFiles = 
this.deltaDiffComputer.getDeltaFiles(
+        previousSnapshotInfo, snapshotInfo, incrementalTables);
+
+    Map<String, List<Pair<Path, SstFileInfo>>> tableGroupedDeltaFiles = 
allTableDeltaFiles.stream()
+        .collect(Collectors.groupingBy(pair -> 
pair.getValue().getColumnFamily()));
+
+    String volumeName = snapshotInfo.getVolumeName();
+    String bucketName = snapshotInfo.getBucketName();
+
+    Set<Path> filesToBeDeleted = new HashSet<>();
+    // All files computed as delta must be deleted irrespective of whether 
ingestion succeeded or not.
+    allTableDeltaFiles.forEach(pair -> filesToBeDeleted.add(pair.getKey()));
+    try (UncheckedAutoCloseableSupplier<OmSnapshot> snapshot = 
omSnapshotManager.getActiveSnapshot(volumeName,
+        bucketName, snapshotInfo.getName());
+         UncheckedAutoCloseableSupplier<OmSnapshot> previousSnapshot = 
omSnapshotManager.getActiveSnapshot(
+             volumeName, bucketName, previousSnapshotInfo.getName())) {
+      for (Map.Entry<String, List<Pair<Path, SstFileInfo>>> entry : 
tableGroupedDeltaFiles.entrySet()) {
+        String table = entry.getKey();
+        List<Pair<Path, SstFileInfo>> deltaFiles = entry.getValue();
+        Path fileToBeIngested;
+        if (deltaFiles.size() == 1 && snapshotVersion > 0) {
+          // If there is only one delta file for the table and the snapshot 
version is also not 0 then the same delta
+          // file can reingested into the checkpointStore.
+          fileToBeIngested = deltaFiles.get(0).getKey();
+        } else {
+          Table<String, byte[]> snapshotTable = 
snapshot.get().getMetadataManager().getStore()
+              .getTable(table, StringCodec.get(), ByteArrayCodec.get());
+          Table<String, byte[]> previousSnapshotTable = 
previousSnapshot.get().getMetadataManager().getStore()
+              .getTable(table, StringCodec.get(), ByteArrayCodec.get());
+
+          String tableBucketPrefix = bucketPrefixInfo.getTablePrefix(table);
+          String sstFileReaderLowerBound = 
bucketPrefixInfo.getTablePrefix(entry.getKey());
+          String sstFileReaderUpperBound = null;
+          if (Strings.isNotEmpty(sstFileReaderLowerBound)) {
+            sstFileReaderUpperBound = 
getLexicographicallyHigherString(sstFileReaderLowerBound);
+          }
+          List<Path> deltaFilePaths = 
deltaFiles.stream().map(Pair::getKey).collect(Collectors.toList());
+          SstFileSetReader sstFileSetReader = new 
SstFileSetReader(deltaFilePaths);
+          fileToBeIngested = differTmpDir.resolve(table + "-" + 
UUID.randomUUID() + SST_FILE_EXTENSION);
+          // Delete all delta files after reingesting into the checkpointStore.
+          filesToBeDeleted.add(fileToBeIngested);
+          int deltaEntriesCount = 0;
+          try (ClosableIterator<String> keysToCheck =
+                   
sstFileSetReader.getKeyStreamWithTombstone(sstFileReaderLowerBound, 
sstFileReaderUpperBound);
+               TableMergeIterator<String, byte[]> tableMergeIterator = new 
TableMergeIterator<>(keysToCheck,
+                  tableBucketPrefix, snapshotTable, previousSnapshotTable);
+               RDBSstFileWriter rdbSstFileWriter = new 
RDBSstFileWriter(fileToBeIngested.toFile())) {
+            while (tableMergeIterator.hasNext()) {
+              Table.KeyValue<String, List<byte[]>> kvs = 
tableMergeIterator.next();
+              // Check if the values are equal or if they are not equal then 
the value should be written to the
+              // delta sstFile.
+              if (!Arrays.equals(kvs.getValue().get(0), 
kvs.getValue().get(1))) {
+                try (CodecBuffer key = 
StringCodec.get().toHeapCodecBuffer(kvs.getKey())) {
+                  byte[] keyArray = key.asReadOnlyByteBuffer().array();
+                  byte[] val = kvs.getValue().get(0);
+                  // If the value is null then add a tombstone to the delta 
sstFile.
+                  if (val == null) {
+                    rdbSstFileWriter.delete(keyArray);
+                  } else {
+                    rdbSstFileWriter.put(keyArray, val);
+                  }
+                }
+                deltaEntriesCount++;
+              }
+            }
+          } catch (RocksDBException e) {
+            throw new RocksDatabaseException("Error while reading sst files.", 
e);
+          }
+          if (deltaEntriesCount == 0) {
+            // If there are no delta entries then delete the delta file. No 
need to ingest the file as a diff.
+            fileToBeIngested = null;
+          }
+        }
+        if (fileToBeIngested != null) {
+          if (!fileToBeIngested.toFile().exists()) {
+            throw new IOException("Delta file does not exist: " + 
fileToBeIngested);
+          }
+          Table checkpointTable = checkpointStore.getTable(table);
+          checkpointTable.loadFromFile(fileToBeIngested.toFile());

Review Comment:
   LoadFile if it fails would throw a ROcksDataBaseException the entire defrag 
would fail



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to