smengcl commented on code in PR #9324:
URL: https://github.com/apache/ozone/pull/9324#discussion_r2558906482
##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/defrag/SnapshotDefragService.java:
##########
@@ -135,48 +196,318 @@ private boolean isRocksToolsNativeLibAvailable() {
}
/**
- * Checks if a snapshot needs defragmentation by examining its YAML metadata.
+ * Determines whether the specified snapshot requires defragmentation and
returns
+ * a pair indicating the need for defragmentation and the corresponding
version of the snapshot.
+ *
+ * @param snapshotInfo Information about the snapshot to be checked for
defragmentation.
+ * @return A pair containing a boolean value and an integer:
+ * - The boolean value indicates whether the snapshot requires
defragmentation
+ * (true if needed, false otherwise).
+ * - The integer represents the version of the snapshot being
evaluated.
+ * @throws IOException If an I/O error occurs while accessing the local
snapshot data or metadata.
*/
- private boolean needsDefragmentation(SnapshotInfo snapshotInfo) {
- if (!SstFilteringService.isSstFiltered(conf, snapshotInfo)) {
- return false;
- }
- try (OmSnapshotLocalDataManager.ReadableOmSnapshotLocalDataProvider
readableOmSnapshotLocalDataProvider =
-
ozoneManager.getOmSnapshotManager().getSnapshotLocalDataManager().getOmSnapshotLocalData(snapshotInfo))
{
- Path snapshotPath = OmSnapshotManager.getSnapshotPath(
- ozoneManager.getMetadataManager(), snapshotInfo,
-
readableOmSnapshotLocalDataProvider.getSnapshotLocalData().getVersion());
+ @VisibleForTesting
+ Pair<Boolean, Integer> needsDefragmentation(SnapshotInfo snapshotInfo)
throws IOException {
+ // Update snapshot local metadata to point to the correct previous
snapshotId if it was different and check if
+ // snapshot needs defrag.
+ try (WritableOmSnapshotLocalDataProvider
writableOmSnapshotLocalDataProvider =
+
snapshotLocalDataManager.getWritableOmSnapshotLocalData(snapshotInfo)) {
// Read snapshot local metadata from YAML
// Check if snapshot needs compaction (defragmentation)
- boolean needsDefrag = readableOmSnapshotLocalDataProvider.needsDefrag();
- LOG.debug("Snapshot {} needsDefragmentation field value: {}",
snapshotInfo.getName(), needsDefrag);
+ writableOmSnapshotLocalDataProvider.commit();
+ boolean needsDefrag = writableOmSnapshotLocalDataProvider.needsDefrag();
+ OmSnapshotLocalData localData =
writableOmSnapshotLocalDataProvider.getSnapshotLocalData();
+ if (!needsDefrag) {
+ OmSnapshotLocalData previousLocalData =
writableOmSnapshotLocalDataProvider.getPreviousSnapshotLocalData();
+ LOG.debug("Skipping defragmentation since snapshot has already been
defragmented: id : {}(version: {}=>{}) " +
+ "previousId: {}(version: {})", snapshotInfo.getSnapshotId(),
localData.getVersion(),
+
localData.getVersionSstFileInfos().get(localData.getVersion()).getPreviousSnapshotVersion(),
+ snapshotInfo.getPathPreviousSnapshotId(),
previousLocalData.getVersion());
+ } else {
+ LOG.debug("Snapshot {} needsDefragmentation field value: true",
snapshotInfo.getSnapshotId());
+ }
+ return Pair.of(needsDefrag, localData.getVersion());
+ }
+ }
- return needsDefrag;
- } catch (IOException e) {
- LOG.warn("Failed to read YAML metadata for snapshot {}, assuming defrag
needed",
- snapshotInfo.getName(), e);
- return true;
+ private Pair<String, String> getTableBounds(Table<String, ?> table) throws
RocksDatabaseException, CodecException {
+ String tableLowestValue = null, tableHighestValue = null;
+ try (TableIterator<String, String> keyIterator = table.keyIterator()) {
+ if (keyIterator.hasNext()) {
+ // Setting the lowest value to the first key in the table.
+ tableLowestValue = keyIterator.next();
+ }
+ keyIterator.seekToLast();
+ if (keyIterator.hasNext()) {
+ // Setting the highest value to the last key in the table.
+ tableHighestValue = keyIterator.next();
+ }
}
+ return Pair.of(tableLowestValue, tableHighestValue);
}
/**
- * Performs full defragmentation for the first snapshot in the chain.
- * This is a simplified implementation that demonstrates the concept.
+ * Performs a full defragmentation process for specified tables in the
metadata manager.
+ * This method processes all the entries in the tables for the provided
prefix information,
+ * deletes specified key ranges, and compacts the tables to remove
tombstones.
+ *
+ * @param checkpointDBStore the metadata manager responsible for managing
tables during the checkpoint process
+ * @param prefixInfo the prefix information used to identify bucket prefix
and determine key ranges in the tables
+ * @param incrementalTables the set of tables for which incremental
defragmentation is performed.
+ * @throws IOException if an I/O error occurs during table operations or
compaction
*/
- private void performFullDefragmentation(SnapshotInfo snapshotInfo,
- OmSnapshot omSnapshot) throws IOException {
+ @VisibleForTesting
+ void performFullDefragmentation(DBStore checkpointDBStore, TablePrefixInfo
prefixInfo,
+ Set<String> incrementalTables) throws IOException {
+ for (String table : incrementalTables) {
+ Table<String, byte[]> checkpointTable =
checkpointDBStore.getTable(table, StringCodec.get(),
+ ByteArrayCodec.get());
+ String tableBucketPrefix = prefixInfo.getTablePrefix(table);
+ String prefixUpperBound =
getLexicographicallyHigherString(tableBucketPrefix);
+
+ Pair<String, String> tableBounds = getTableBounds(checkpointTable);
+ String tableLowestValue = tableBounds.getLeft();
+ String tableHighestValue = tableBounds.getRight();
+
+ // If lowest value is not null and if the bucket prefix corresponding to
the table is greater than lower then
+ // delete the range between lowest value and bucket prefix.
+ if (tableLowestValue != null &&
tableLowestValue.compareTo(tableBucketPrefix) < 0) {
+ checkpointTable.deleteRange(tableLowestValue, tableBucketPrefix);
+ }
+ // If highest value is not null and if the next higher lexicographical
string of bucket prefix corresponding to
+ // the table is less than equal to the highest value then delete the
range between bucket prefix
+ // and also the highest value.
+ if (tableHighestValue != null &&
tableHighestValue.compareTo(prefixUpperBound) >= 0) {
+ checkpointTable.deleteRange(prefixUpperBound, tableHighestValue);
+ checkpointTable.delete(tableHighestValue);
+ }
+ // Compact the table completely with kForce to get rid of tombstones.
+ try (ManagedCompactRangeOptions compactRangeOptions = new
ManagedCompactRangeOptions()) {
+
compactRangeOptions.setBottommostLevelCompaction(ManagedCompactRangeOptions.BottommostLevelCompaction.kForce);
+ compactRangeOptions.setExclusiveManualCompaction(true);
+ checkpointDBStore.compactTable(table, compactRangeOptions);
+ }
+ }
+ }
- // TODO: Implement full defragmentation
+ /**
+ * Spills table difference into an SST file based on the provided delta file
paths,
+ * current snapshot table, previous snapshot table, and an optional table
key prefix.
+ *
+ * The method reads the delta files and compares the records against the
snapshot tables.
+ * Any differences, including tombstones (deleted entries), are written to a
new SST file.
+ *
+ * @param deltaFilePaths the list of paths to the delta files to process
+ * @param snapshotTable the current snapshot table for comparison
+ * @param previousSnapshotTable the previous snapshot table for comparison
+ * @param tableKeyPrefix the prefix for filtering certain keys, or null if
all keys are to be included
+ * @return a pair of the path of the created SST file containing the
differences and a boolean
+ * indicating whether any delta entries were written (true if there
are differences, false otherwise)
+ * @throws IOException if an I/O error occurs during processing
+ */
+ @VisibleForTesting
+ Pair<Path, Boolean> spillTableDiffIntoSstFile(List<Path> deltaFilePaths,
Table<String, byte[]> snapshotTable,
Review Comment:
We could move this and other helper methods to a separate static helper
class to reduce length of SDS class
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]