swamirishi commented on code in PR #8214:
URL: https://github.com/apache/ozone/pull/8214#discussion_r2080210531
##########
hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java:
##########
@@ -1200,6 +1249,109 @@ public void pruneSstFiles() {
}
}
+ /**
+ * Defines the task that removes OMKeyInfo from SST files from backup
directory to
+ * save disk space.
+ */
+ public void pruneSstFileValues() {
+ if (!shouldRun()) {
+ return;
+ }
+
+ Path sstBackupDirPath = Paths.get(sstBackupDir);
+ Path prunedSSTFilePath = sstBackupDirPath.resolve(PRUNED_SST_FILE_TEMP);
+ try (ManagedOptions managedOptions = new ManagedOptions();
+ ManagedEnvOptions envOptions = new ManagedEnvOptions();
+ ManagedSstFileWriter sstFileWriter = new
ManagedSstFileWriter(envOptions, managedOptions)) {
+ byte[] compactionLogEntryKey;
+ int batchCounter = 0;
+ while ((compactionLogEntryKey = pruneQueue.peek()) != null &&
++batchCounter <= pruneSSTFileBatchSize) {
+ CompactionLogEntry compactionLogEntry;
+ synchronized (this) {
+ try {
+ compactionLogEntry =
CompactionLogEntry.getCodec().fromPersistedFormat(
+ activeRocksDB.get().get(compactionLogTableCFHandle,
compactionLogEntryKey));
+ } catch (RocksDBException ex) {
+ throw new RocksDatabaseException("Failed to get compaction log
entry.", ex);
+ }
+ }
+ boolean shouldUpdateTable = false;
+ List<CompactionFileInfo> fileInfoList =
compactionLogEntry.getInputFileInfoList();
+ List<CompactionFileInfo> updatedFileInfoList = new ArrayList<>();
+ for (CompactionFileInfo fileInfo : fileInfoList) {
+ if (fileInfo.isPruned()) {
+ updatedFileInfoList.add(fileInfo);
+ continue;
+ }
+ Path sstFilePath = sstBackupDirPath.resolve(fileInfo.getFileName() +
ROCKSDB_SST_SUFFIX);
+ if (Files.notExists(sstFilePath)) {
+ LOG.debug("Skipping pruning SST file {} as it does not exist in
backup directory.", sstFilePath);
+ updatedFileInfoList.add(fileInfo);
+ continue;
+ }
+
+ // Write the file.sst => pruned.sst.tmp
+ Files.deleteIfExists(prunedSSTFilePath);
+ try (ManagedRawSSTFileReader<Pair<byte[], Integer>> sstFileReader =
new ManagedRawSSTFileReader<>(
+ managedOptions, sstFilePath.toFile().getAbsolutePath(),
SST_READ_AHEAD_SIZE);
+ ManagedRawSSTFileIterator<Pair<byte[], Integer>> itr =
sstFileReader.newIterator(
+ keyValue -> Pair.of(keyValue.getKey(), keyValue.getType()),
null, null)) {
+ sstFileWriter.open(prunedSSTFilePath.toFile().getAbsolutePath());
+ while (itr.hasNext()) {
+ Pair<byte[], Integer> keyValue = itr.next();
+ if (keyValue.getValue() == 0) {
+ sstFileWriter.delete(keyValue.getKey());
+ } else {
+ sstFileWriter.put(keyValue.getKey(), new byte[0]);
+ }
+ }
+ } catch (RocksDBException ex) {
+ throw new RocksDatabaseException("Failed to write pruned entries
for " + sstFilePath, ex);
+ } finally {
+ try {
+ sstFileWriter.finish();
+ } catch (RocksDBException ex) {
+ throw new RocksDatabaseException("Failed to finish writing to "
+ prunedSSTFilePath, ex);
+ }
+ }
+
+ // Move file.sst.tmp to file.sst and replace existing file atomically
+ try (BootstrapStateHandler.Lock lock =
getBootstrapStateLock().lock()) {
+ Files.move(prunedSSTFilePath, sstFilePath,
+ StandardCopyOption.ATOMIC_MOVE,
StandardCopyOption.REPLACE_EXISTING);
+ }
+ shouldUpdateTable = true;
+ fileInfo.setPruned();
+ updatedFileInfoList.add(fileInfo);
+ LOG.debug("Completed pruning OMKeyInfo from {}", sstFilePath);
+ }
+
+ // Update Compaction Log table. Track keys that need updating.
+ if (shouldUpdateTable) {
+ CompactionLogEntry.Builder builder = new
CompactionLogEntry.Builder(compactionLogEntry.getDbSequenceNumber(),
+ compactionLogEntry.getCompactionTime(), updatedFileInfoList,
compactionLogEntry.getOutputFileInfoList());
+ String compactionReason = compactionLogEntry.getCompactionReason();
+ if (compactionReason != null) {
+ builder.setCompactionReason(compactionReason);
+ }
+ synchronized (this) {
Review Comment:
Is this synchronized because of DAG pruner? If that is the case this entire
operation is not thread safe. It could so be the case that the dag pruner might
have deleted the entry.
##########
hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java:
##########
@@ -1200,6 +1249,109 @@ public void pruneSstFiles() {
}
}
+ /**
+ * Defines the task that removes OMKeyInfo from SST files from backup
directory to
+ * save disk space.
+ */
+ public void pruneSstFileValues() {
+ if (!shouldRun()) {
+ return;
+ }
+
+ Path sstBackupDirPath = Paths.get(sstBackupDir);
+ Path prunedSSTFilePath = sstBackupDirPath.resolve(PRUNED_SST_FILE_TEMP);
+ try (ManagedOptions managedOptions = new ManagedOptions();
+ ManagedEnvOptions envOptions = new ManagedEnvOptions();
+ ManagedSstFileWriter sstFileWriter = new
ManagedSstFileWriter(envOptions, managedOptions)) {
+ byte[] compactionLogEntryKey;
+ int batchCounter = 0;
+ while ((compactionLogEntryKey = pruneQueue.peek()) != null &&
++batchCounter <= pruneSSTFileBatchSize) {
+ CompactionLogEntry compactionLogEntry;
+ synchronized (this) {
+ try {
+ compactionLogEntry =
CompactionLogEntry.getCodec().fromPersistedFormat(
+ activeRocksDB.get().get(compactionLogTableCFHandle,
compactionLogEntryKey));
+ } catch (RocksDBException ex) {
+ throw new RocksDatabaseException("Failed to get compaction log
entry.", ex);
+ }
+ }
+ boolean shouldUpdateTable = false;
+ List<CompactionFileInfo> fileInfoList =
compactionLogEntry.getInputFileInfoList();
+ List<CompactionFileInfo> updatedFileInfoList = new ArrayList<>();
+ for (CompactionFileInfo fileInfo : fileInfoList) {
+ if (fileInfo.isPruned()) {
+ updatedFileInfoList.add(fileInfo);
+ continue;
+ }
+ Path sstFilePath = sstBackupDirPath.resolve(fileInfo.getFileName() +
ROCKSDB_SST_SUFFIX);
+ if (Files.notExists(sstFilePath)) {
+ LOG.debug("Skipping pruning SST file {} as it does not exist in
backup directory.", sstFilePath);
+ updatedFileInfoList.add(fileInfo);
+ continue;
+ }
+
+ // Write the file.sst => pruned.sst.tmp
+ Files.deleteIfExists(prunedSSTFilePath);
+ try (ManagedRawSSTFileReader<Pair<byte[], Integer>> sstFileReader =
new ManagedRawSSTFileReader<>(
+ managedOptions, sstFilePath.toFile().getAbsolutePath(),
SST_READ_AHEAD_SIZE);
+ ManagedRawSSTFileIterator<Pair<byte[], Integer>> itr =
sstFileReader.newIterator(
+ keyValue -> Pair.of(keyValue.getKey(), keyValue.getType()),
null, null)) {
+ sstFileWriter.open(prunedSSTFilePath.toFile().getAbsolutePath());
+ while (itr.hasNext()) {
+ Pair<byte[], Integer> keyValue = itr.next();
+ if (keyValue.getValue() == 0) {
+ sstFileWriter.delete(keyValue.getKey());
+ } else {
+ sstFileWriter.put(keyValue.getKey(), new byte[0]);
+ }
+ }
+ } catch (RocksDBException ex) {
+ throw new RocksDatabaseException("Failed to write pruned entries
for " + sstFilePath, ex);
+ } finally {
+ try {
+ sstFileWriter.finish();
+ } catch (RocksDBException ex) {
+ throw new RocksDatabaseException("Failed to finish writing to "
+ prunedSSTFilePath, ex);
+ }
+ }
+
+ // Move file.sst.tmp to file.sst and replace existing file atomically
+ try (BootstrapStateHandler.Lock lock =
getBootstrapStateLock().lock()) {
+ Files.move(prunedSSTFilePath, sstFilePath,
+ StandardCopyOption.ATOMIC_MOVE,
StandardCopyOption.REPLACE_EXISTING);
+ }
+ shouldUpdateTable = true;
+ fileInfo.setPruned();
+ updatedFileInfoList.add(fileInfo);
+ LOG.debug("Completed pruning OMKeyInfo from {}", sstFilePath);
+ }
+
+ // Update Compaction Log table. Track keys that need updating.
+ if (shouldUpdateTable) {
+ CompactionLogEntry.Builder builder = new
CompactionLogEntry.Builder(compactionLogEntry.getDbSequenceNumber(),
Review Comment:
nit: Would prefer a toBuilder() method in CompactionLogEntry class.
##########
hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java:
##########
@@ -1200,6 +1249,109 @@ public void pruneSstFiles() {
}
}
+ /**
+ * Defines the task that removes OMKeyInfo from SST files from backup
directory to
+ * save disk space.
+ */
+ public void pruneSstFileValues() {
+ if (!shouldRun()) {
+ return;
+ }
+
+ Path sstBackupDirPath = Paths.get(sstBackupDir);
+ Path prunedSSTFilePath = sstBackupDirPath.resolve(PRUNED_SST_FILE_TEMP);
+ try (ManagedOptions managedOptions = new ManagedOptions();
+ ManagedEnvOptions envOptions = new ManagedEnvOptions();
+ ManagedSstFileWriter sstFileWriter = new
ManagedSstFileWriter(envOptions, managedOptions)) {
+ byte[] compactionLogEntryKey;
+ int batchCounter = 0;
+ while ((compactionLogEntryKey = pruneQueue.peek()) != null &&
++batchCounter <= pruneSSTFileBatchSize) {
+ CompactionLogEntry compactionLogEntry;
+ synchronized (this) {
+ try {
+ compactionLogEntry =
CompactionLogEntry.getCodec().fromPersistedFormat(
+ activeRocksDB.get().get(compactionLogTableCFHandle,
compactionLogEntryKey));
+ } catch (RocksDBException ex) {
+ throw new RocksDatabaseException("Failed to get compaction log
entry.", ex);
+ }
+ }
+ boolean shouldUpdateTable = false;
+ List<CompactionFileInfo> fileInfoList =
compactionLogEntry.getInputFileInfoList();
+ List<CompactionFileInfo> updatedFileInfoList = new ArrayList<>();
+ for (CompactionFileInfo fileInfo : fileInfoList) {
+ if (fileInfo.isPruned()) {
+ updatedFileInfoList.add(fileInfo);
+ continue;
+ }
+ Path sstFilePath = sstBackupDirPath.resolve(fileInfo.getFileName() +
ROCKSDB_SST_SUFFIX);
+ if (Files.notExists(sstFilePath)) {
+ LOG.debug("Skipping pruning SST file {} as it does not exist in
backup directory.", sstFilePath);
+ updatedFileInfoList.add(fileInfo);
+ continue;
+ }
+
+ // Write the file.sst => pruned.sst.tmp
+ Files.deleteIfExists(prunedSSTFilePath);
+ try (ManagedRawSSTFileReader<Pair<byte[], Integer>> sstFileReader =
new ManagedRawSSTFileReader<>(
+ managedOptions, sstFilePath.toFile().getAbsolutePath(),
SST_READ_AHEAD_SIZE);
+ ManagedRawSSTFileIterator<Pair<byte[], Integer>> itr =
sstFileReader.newIterator(
+ keyValue -> Pair.of(keyValue.getKey(), keyValue.getType()),
null, null)) {
+ sstFileWriter.open(prunedSSTFilePath.toFile().getAbsolutePath());
+ while (itr.hasNext()) {
+ Pair<byte[], Integer> keyValue = itr.next();
+ if (keyValue.getValue() == 0) {
+ sstFileWriter.delete(keyValue.getKey());
+ } else {
+ sstFileWriter.put(keyValue.getKey(), new byte[0]);
+ }
+ }
+ } catch (RocksDBException ex) {
+ throw new RocksDatabaseException("Failed to write pruned entries
for " + sstFilePath, ex);
+ } finally {
+ try {
+ sstFileWriter.finish();
+ } catch (RocksDBException ex) {
+ throw new RocksDatabaseException("Failed to finish writing to "
+ prunedSSTFilePath, ex);
Review Comment:
```suggestion
throw new RocksDatabaseException("Failed to close
SSTFileWriter writing to path : " + prunedSSTFilePath, ex);
```
##########
hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java:
##########
@@ -1200,6 +1249,109 @@ public void pruneSstFiles() {
}
}
+ /**
+ * Defines the task that removes OMKeyInfo from SST files from backup
directory to
+ * save disk space.
+ */
+ public void pruneSstFileValues() {
+ if (!shouldRun()) {
+ return;
+ }
+
+ Path sstBackupDirPath = Paths.get(sstBackupDir);
+ Path prunedSSTFilePath = sstBackupDirPath.resolve(PRUNED_SST_FILE_TEMP);
+ try (ManagedOptions managedOptions = new ManagedOptions();
+ ManagedEnvOptions envOptions = new ManagedEnvOptions();
+ ManagedSstFileWriter sstFileWriter = new
ManagedSstFileWriter(envOptions, managedOptions)) {
+ byte[] compactionLogEntryKey;
+ int batchCounter = 0;
+ while ((compactionLogEntryKey = pruneQueue.peek()) != null &&
++batchCounter <= pruneSSTFileBatchSize) {
+ CompactionLogEntry compactionLogEntry;
+ synchronized (this) {
+ try {
+ compactionLogEntry =
CompactionLogEntry.getCodec().fromPersistedFormat(
+ activeRocksDB.get().get(compactionLogTableCFHandle,
compactionLogEntryKey));
+ } catch (RocksDBException ex) {
+ throw new RocksDatabaseException("Failed to get compaction log
entry.", ex);
+ }
+ }
+ boolean shouldUpdateTable = false;
+ List<CompactionFileInfo> fileInfoList =
compactionLogEntry.getInputFileInfoList();
+ List<CompactionFileInfo> updatedFileInfoList = new ArrayList<>();
+ for (CompactionFileInfo fileInfo : fileInfoList) {
+ if (fileInfo.isPruned()) {
+ updatedFileInfoList.add(fileInfo);
+ continue;
+ }
+ Path sstFilePath = sstBackupDirPath.resolve(fileInfo.getFileName() +
ROCKSDB_SST_SUFFIX);
+ if (Files.notExists(sstFilePath)) {
+ LOG.debug("Skipping pruning SST file {} as it does not exist in
backup directory.", sstFilePath);
+ updatedFileInfoList.add(fileInfo);
+ continue;
+ }
+
+ // Write the file.sst => pruned.sst.tmp
+ Files.deleteIfExists(prunedSSTFilePath);
+ try (ManagedRawSSTFileReader<Pair<byte[], Integer>> sstFileReader =
new ManagedRawSSTFileReader<>(
+ managedOptions, sstFilePath.toFile().getAbsolutePath(),
SST_READ_AHEAD_SIZE);
+ ManagedRawSSTFileIterator<Pair<byte[], Integer>> itr =
sstFileReader.newIterator(
+ keyValue -> Pair.of(keyValue.getKey(), keyValue.getType()),
null, null)) {
+ sstFileWriter.open(prunedSSTFilePath.toFile().getAbsolutePath());
+ while (itr.hasNext()) {
+ Pair<byte[], Integer> keyValue = itr.next();
+ if (keyValue.getValue() == 0) {
+ sstFileWriter.delete(keyValue.getKey());
+ } else {
+ sstFileWriter.put(keyValue.getKey(), new byte[0]);
+ }
+ }
+ } catch (RocksDBException ex) {
+ throw new RocksDatabaseException("Failed to write pruned entries
for " + sstFilePath, ex);
+ } finally {
+ try {
+ sstFileWriter.finish();
+ } catch (RocksDBException ex) {
+ throw new RocksDatabaseException("Failed to finish writing to "
+ prunedSSTFilePath, ex);
+ }
+ }
+
+ // Move file.sst.tmp to file.sst and replace existing file atomically
+ try (BootstrapStateHandler.Lock lock =
getBootstrapStateLock().lock()) {
+ Files.move(prunedSSTFilePath, sstFilePath,
+ StandardCopyOption.ATOMIC_MOVE,
StandardCopyOption.REPLACE_EXISTING);
+ }
+ shouldUpdateTable = true;
+ fileInfo.setPruned();
+ updatedFileInfoList.add(fileInfo);
+ LOG.debug("Completed pruning OMKeyInfo from {}", sstFilePath);
+ }
+
+ // Update Compaction Log table. Track keys that need updating.
+ if (shouldUpdateTable) {
Review Comment:
Wouldn't this be always true since we are throwing all exceptions if this
line is reached? Seems redundant
##########
hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java:
##########
@@ -217,6 +235,19 @@ public class RocksDBCheckpointDiffer implements
AutoCloseable,
OZONE_OM_SNAPSHOT_PRUNE_COMPACTION_DAG_DAEMON_RUN_INTERVAL_DEFAULT,
TimeUnit.MILLISECONDS);
+ this.pruneSSTFileBatchSize = configuration.getInt(
+ OZONE_OM_SNAPSHOT_PRUNE_COMPACTION_BACKUP_BATCH_SIZE,
+ OZONE_OM_SNAPSHOT_PRUNE_COMPACTION_BACKUP_BATCH_SIZE_DEFAULT);
+ try {
+ if (configuration.getBoolean(OZONE_OM_SNAPSHOT_LOAD_NATIVE_LIB,
OZONE_OM_SNAPSHOT_LOAD_NATIVE_LIB_DEFAULT)
+ && ManagedRawSSTFileReader.loadLibrary()) {
+ pruneQueue = new LinkedList<>();
Review Comment:
This is not thread safe. Use either synchronized queue or
ConcurrentLinkedQueue
##########
hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java:
##########
@@ -1200,6 +1249,109 @@ public void pruneSstFiles() {
}
}
+ /**
+ * Defines the task that removes OMKeyInfo from SST files from backup
directory to
+ * save disk space.
+ */
+ public void pruneSstFileValues() {
+ if (!shouldRun()) {
+ return;
+ }
+
+ Path sstBackupDirPath = Paths.get(sstBackupDir);
+ Path prunedSSTFilePath = sstBackupDirPath.resolve(PRUNED_SST_FILE_TEMP);
+ try (ManagedOptions managedOptions = new ManagedOptions();
+ ManagedEnvOptions envOptions = new ManagedEnvOptions();
+ ManagedSstFileWriter sstFileWriter = new
ManagedSstFileWriter(envOptions, managedOptions)) {
+ byte[] compactionLogEntryKey;
+ int batchCounter = 0;
+ while ((compactionLogEntryKey = pruneQueue.peek()) != null &&
++batchCounter <= pruneSSTFileBatchSize) {
+ CompactionLogEntry compactionLogEntry;
+ synchronized (this) {
+ try {
+ compactionLogEntry =
CompactionLogEntry.getCodec().fromPersistedFormat(
+ activeRocksDB.get().get(compactionLogTableCFHandle,
compactionLogEntryKey));
+ } catch (RocksDBException ex) {
+ throw new RocksDatabaseException("Failed to get compaction log
entry.", ex);
+ }
+ }
+ boolean shouldUpdateTable = false;
+ List<CompactionFileInfo> fileInfoList =
compactionLogEntry.getInputFileInfoList();
+ List<CompactionFileInfo> updatedFileInfoList = new ArrayList<>();
+ for (CompactionFileInfo fileInfo : fileInfoList) {
+ if (fileInfo.isPruned()) {
+ updatedFileInfoList.add(fileInfo);
+ continue;
+ }
+ Path sstFilePath = sstBackupDirPath.resolve(fileInfo.getFileName() +
ROCKSDB_SST_SUFFIX);
+ if (Files.notExists(sstFilePath)) {
+ LOG.debug("Skipping pruning SST file {} as it does not exist in
backup directory.", sstFilePath);
+ updatedFileInfoList.add(fileInfo);
+ continue;
+ }
+
+ // Write the file.sst => pruned.sst.tmp
+ Files.deleteIfExists(prunedSSTFilePath);
+ try (ManagedRawSSTFileReader<Pair<byte[], Integer>> sstFileReader =
new ManagedRawSSTFileReader<>(
+ managedOptions, sstFilePath.toFile().getAbsolutePath(),
SST_READ_AHEAD_SIZE);
+ ManagedRawSSTFileIterator<Pair<byte[], Integer>> itr =
sstFileReader.newIterator(
+ keyValue -> Pair.of(keyValue.getKey(), keyValue.getType()),
null, null)) {
+ sstFileWriter.open(prunedSSTFilePath.toFile().getAbsolutePath());
+ while (itr.hasNext()) {
+ Pair<byte[], Integer> keyValue = itr.next();
+ if (keyValue.getValue() == 0) {
+ sstFileWriter.delete(keyValue.getKey());
+ } else {
+ sstFileWriter.put(keyValue.getKey(), new byte[0]);
+ }
+ }
+ } catch (RocksDBException ex) {
+ throw new RocksDatabaseException("Failed to write pruned entries
for " + sstFilePath, ex);
+ } finally {
+ try {
+ sstFileWriter.finish();
+ } catch (RocksDBException ex) {
+ throw new RocksDatabaseException("Failed to finish writing to "
+ prunedSSTFilePath, ex);
+ }
+ }
+
+ // Move file.sst.tmp to file.sst and replace existing file atomically
+ try (BootstrapStateHandler.Lock lock =
getBootstrapStateLock().lock()) {
+ Files.move(prunedSSTFilePath, sstFilePath,
+ StandardCopyOption.ATOMIC_MOVE,
StandardCopyOption.REPLACE_EXISTING);
+ }
+ shouldUpdateTable = true;
+ fileInfo.setPruned();
+ updatedFileInfoList.add(fileInfo);
+ LOG.debug("Completed pruning OMKeyInfo from {}", sstFilePath);
+ }
+
+ // Update Compaction Log table. Track keys that need updating.
+ if (shouldUpdateTable) {
+ CompactionLogEntry.Builder builder = new
CompactionLogEntry.Builder(compactionLogEntry.getDbSequenceNumber(),
+ compactionLogEntry.getCompactionTime(), updatedFileInfoList,
compactionLogEntry.getOutputFileInfoList());
+ String compactionReason = compactionLogEntry.getCompactionReason();
+ if (compactionReason != null) {
+ builder.setCompactionReason(compactionReason);
+ }
+ synchronized (this) {
+ try {
+ activeRocksDB.get().put(compactionLogTableCFHandle,
compactionLogEntryKey,
+ builder.build().getProtobuf().toByteArray());
+ } catch (RocksDBException ex) {
+ throw new RocksDatabaseException("Failed to update the
compaction log table for entry: "
+ + compactionLogEntry, ex);
+ }
+ }
+ }
+ pruneQueue.poll();
+ }
+ Files.deleteIfExists(prunedSSTFilePath);
Review Comment:
Why delete it should not exist here. Do this in the beginning before writing
the sst file.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]