swamirishi commented on code in PR #5236:
URL: https://github.com/apache/ozone/pull/5236#discussion_r1310950495


##########
hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDatabase.java:
##########
@@ -949,46 +948,42 @@ private int getLastLevel() throws IOException {
   /**
    * Deletes sst files which do not correspond to prefix
    * for given table.
-   * @param prefixPairs, a list of pair (TableName,prefixUsed).
+   * @param prefixPairs, a map of TableName to prefixUsed.
    */
-  public void deleteFilesNotMatchingPrefix(
-      List<Pair<String, String>> prefixPairs,
-      BooleanTriFunction<String, String, String, Boolean> filterFunction)
+  public void deleteFilesNotMatchingPrefix(Map<String, String> prefixPairs)
       throws IOException, RocksDBException {
     assertClose();
     for (LiveFileMetaData liveFileMetaData : getSstFileList()) {
       String sstFileColumnFamily =
           new String(liveFileMetaData.columnFamilyName(),
-              StandardCharsets.UTF_8);
+              UTF_8);
       int lastLevel = getLastLevel();
-      for (Pair<String, String> prefixPair : prefixPairs) {
-        String columnFamily = prefixPair.getKey();
-        String prefixForColumnFamily = prefixPair.getValue();
-        if (!sstFileColumnFamily.equals(columnFamily)) {
-          continue;
-        }
-        // RocksDB #deleteFile API allows only to delete the last level of
-        // SST Files. Any level < last level won't get deleted and
-        // only last file of level 0 can be deleted
-        // and will throw warning in the rocksdb manifest.
-        // Instead, perform the level check here
-        // itself to avoid failed delete attempts for lower level files.
-        if (liveFileMetaData.level() != lastLevel || lastLevel == 0) {
-          continue;
-        }
-        String firstDbKey =
-            new String(liveFileMetaData.smallestKey(), StandardCharsets.UTF_8);
-        String lastDbKey =
-            new String(liveFileMetaData.largestKey(), StandardCharsets.UTF_8);
-        boolean isKeyWithPrefixPresent =
-            filterFunction.apply(firstDbKey, lastDbKey, prefixForColumnFamily);
-        if (!isKeyWithPrefixPresent) {
-          LOG.info("Deleting sst file {} corresponding to column family"
-                  + " {} from db: {}", liveFileMetaData.fileName(),
-              StringUtils.bytes2String(liveFileMetaData.columnFamilyName()),
-              db.get().getName());
-          db.deleteFile(liveFileMetaData);
-        }
+
+      if (!prefixPairs.containsKey(sstFileColumnFamily)) {
+        continue;
+      }
+
+      // RocksDB #deleteFile API allows only to delete the last level of
+      // SST Files. Any level < last level won't get deleted and
+      // only last file of level 0 can be deleted
+      // and will throw warning in the rocksdb manifest.
+      // Instead, perform the level check here
+      // itself to avoid failed delete attempts for lower level files.
+      if (liveFileMetaData.level() != lastLevel || lastLevel == 0) {
+        continue;
+      }
+
+      String prefixForColumnFamily = prefixPairs.get(sstFileColumnFamily);
+      String firstDbKey = new String(liveFileMetaData.smallestKey(), UTF_8);
+      String lastDbKey = new String(liveFileMetaData.largestKey(), UTF_8);
+      boolean isKeyWithPrefixPresent = RocksDiffUtils.isKeyWithPrefixPresent(
+          prefixForColumnFamily, firstDbKey, lastDbKey);
+      if (!isKeyWithPrefixPresent) {
+        LOG.info("Deleting sst file {} corresponding to column family"

Review Comment:
   nit: Nothing related to this change. It would be great if you could add the 
expectedPrefix in the log message. It would be good for debugging.



##########
hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java:
##########
@@ -1030,7 +1043,8 @@ synchronized void internalGetSSTDiffList(
             continue;
           }
 

Review Comment:
   ```suggestion
   if (shouldSkipNode(current, columnFamilyToPrefixMap)) {
                 LOG.debug("Current Level node: '{}' has keys from startKey: 
'{}' " +
                         "and endKey: '{}'. columnFamilyToPrefixMap is : {}.",
                     current.getFileName(), current.getStartKey(), 
current.getEndKey(),
                     columnFamilyToPrefixMap);
                 continue;
               }
   ```



##########
hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SstFilteringService.java:
##########
@@ -192,8 +176,10 @@ public BackgroundTaskResult call() throws Exception {
             LOG.debug("Processing snapshot {} to filter relevant SST Files",
                 snapShotTableKey);
 
-            List<Pair<String, String>> prefixPairs = constructPrefixPairs(
-                snapshotInfo);
+            Map<String, String> prefixPairs = getColumnFamilyToPrefixMap(

Review Comment:
   nit: Change variable to something like columnFamilyNameToPrefixMap



##########
hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/CompactionLogEntry.java:
##########
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ozone.rocksdiff;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.hadoop.hdds.StringUtils;
+import 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.CompactionLogEntryProto;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedOptions;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedReadOptions;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.SstFileReader;
+import org.rocksdb.SstFileReaderIterator;
+
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static 
org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer.SST_FILE_EXTENSION_LENGTH;
+
+/**
+ * Compaction log entry Dao to write to the compaction log file.
+ */
+public class CompactionLogEntry {
+  private final long dbSequenceNumber;
+  private final List<FileInfo> inputFileInfoList;
+  private final List<FileInfo> outputFileInfoList;
+
+  public CompactionLogEntry(long dbSequenceNumber,
+                            List<FileInfo> inputFileInfoList,
+                            List<FileInfo> outputFileInfoList) {
+    this.dbSequenceNumber = dbSequenceNumber;
+    this.inputFileInfoList = inputFileInfoList;
+    this.outputFileInfoList = outputFileInfoList;
+  }
+
+  public List<FileInfo> getInputFileInfoList() {
+    return inputFileInfoList;
+  }
+
+  public List<FileInfo> getOutputFileInfoList() {
+    return outputFileInfoList;
+  }
+
+  public long getDbSequenceNumber() {
+    return dbSequenceNumber;
+  }
+
+  public CompactionLogEntryProto getProtobuf() {
+    CompactionLogEntryProto.Builder builder = CompactionLogEntryProto
+        .newBuilder()
+        .setDbSequenceNumber(dbSequenceNumber);
+
+    if (inputFileInfoList != null) {
+      inputFileInfoList.forEach(fileInfo ->
+          builder.addInputFileIntoList(fileInfo.getProtobuf()));
+    }
+
+    if (outputFileInfoList != null) {
+      outputFileInfoList.forEach(fileInfo ->
+          builder.addOutputFileIntoList(fileInfo.getProtobuf()));
+    }
+
+    return builder.build();
+  }
+
+  public static CompactionLogEntry getFromProtobuf(
+      CompactionLogEntryProto proto) {
+    List<FileInfo> inputFileInfo = proto.getInputFileIntoListList().stream()
+        .map(FileInfo::getFromProtobuf)
+        .collect(Collectors.toList());
+
+    List<FileInfo> outputFileInfo = proto.getOutputFileIntoListList().stream()
+        .map(FileInfo::getFromProtobuf)
+        .collect(Collectors.toList());
+
+
+    return new CompactionLogEntry(proto.getDbSequenceNumber(),
+        inputFileInfo, outputFileInfo);
+  }
+
+  public String toEncodedString() {
+    // Encoding is used to deal with \n. Protobuf appends \n after each
+    // parameter. If ByteArray is simply converted to a string or
+    // protobuf.toString(), it will contain \n and will be added to the log.
+    // Which creates a problem while reading compaction logs.
+    // Compaction log lines are read sequentially assuming each line is one
+    // compaction log entry.
+    return Base64.getEncoder().encodeToString(getProtobuf().toByteArray());
+  }
+
+  public static CompactionLogEntry fromEncodedString(String string) {
+    try {
+      byte[] decode = Base64.getDecoder().decode(string);
+      return getFromProtobuf(CompactionLogEntryProto.parseFrom(decode));
+    } catch (InvalidProtocolBufferException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public String toString() {
+    return String.format("dbSequenceNumber: '%s', inputFileInfoList: '%s'," +
+            " outputFileInfoList: '%s',", dbSequenceNumber, inputFileInfoList,
+        outputFileInfoList);
+  }
+
+  public static CompactionLogEntry fromCompactionFiles(
+      long dbSequenceNumber,
+      List<String> inputFiles,
+      List<String> outputFiles
+  ) {
+
+    try (ManagedOptions options = new ManagedOptions();
+         ManagedReadOptions readOptions = new ManagedReadOptions()) {
+      List<FileInfo> inputFileInfos = convertFileInfo(inputFiles, options,
+          readOptions);
+      List<FileInfo> outputFileInfos = convertFileInfo(outputFiles, options,
+          readOptions);
+      return new CompactionLogEntry(dbSequenceNumber, inputFileInfos,
+          outputFileInfos);
+    }
+  }
+
+  private static List<FileInfo> convertFileInfo(
+      List<String> sstFiles,
+      ManagedOptions options,
+      ManagedReadOptions readOptions
+  ) {
+    if (CollectionUtils.isEmpty(sstFiles)) {
+      return Collections.emptyList();
+    }
+
+    final int fileNameOffset = sstFiles.get(0).lastIndexOf("/") + 1;
+    List<FileInfo> response = new ArrayList<>();
+
+    for (String sstFile : sstFiles) {
+      String fileName = sstFile.substring(fileNameOffset,
+          sstFile.length() - SST_FILE_EXTENSION_LENGTH);
+      SstFileReader fileReader = new SstFileReader(options);
+      try {
+        fileReader.open(sstFile);
+        String columnFamily = StringUtils.bytes2String(
+            fileReader.getTableProperties().getColumnFamilyName());
+        SstFileReaderIterator iterator = fileReader.newIterator(readOptions);

Review Comment:
   Don't need an iterator. You can probably get this info from manifest file. 
On doesn't need to open each and every sst file. Can you see you can get a 
rocks db instance here if it is AOS. This would be more optimized.



##########
hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/CompactionLogEntry.java:
##########
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ozone.rocksdiff;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.hadoop.hdds.StringUtils;
+import 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.CompactionLogEntryProto;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedOptions;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedReadOptions;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.SstFileReader;
+import org.rocksdb.SstFileReaderIterator;
+
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static 
org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer.SST_FILE_EXTENSION_LENGTH;
+
+/**
+ * Compaction log entry Dao to write to the compaction log file.
+ */
+public class CompactionLogEntry {
+  private final long dbSequenceNumber;
+  private final List<FileInfo> inputFileInfoList;
+  private final List<FileInfo> outputFileInfoList;
+
+  public CompactionLogEntry(long dbSequenceNumber,
+                            List<FileInfo> inputFileInfoList,
+                            List<FileInfo> outputFileInfoList) {
+    this.dbSequenceNumber = dbSequenceNumber;
+    this.inputFileInfoList = inputFileInfoList;
+    this.outputFileInfoList = outputFileInfoList;
+  }
+
+  public List<FileInfo> getInputFileInfoList() {
+    return inputFileInfoList;
+  }
+
+  public List<FileInfo> getOutputFileInfoList() {
+    return outputFileInfoList;
+  }
+
+  public long getDbSequenceNumber() {
+    return dbSequenceNumber;
+  }
+
+  public CompactionLogEntryProto getProtobuf() {
+    CompactionLogEntryProto.Builder builder = CompactionLogEntryProto
+        .newBuilder()
+        .setDbSequenceNumber(dbSequenceNumber);
+
+    if (inputFileInfoList != null) {
+      inputFileInfoList.forEach(fileInfo ->
+          builder.addInputFileIntoList(fileInfo.getProtobuf()));
+    }
+
+    if (outputFileInfoList != null) {
+      outputFileInfoList.forEach(fileInfo ->
+          builder.addOutputFileIntoList(fileInfo.getProtobuf()));
+    }
+
+    return builder.build();
+  }
+
+  public static CompactionLogEntry getFromProtobuf(
+      CompactionLogEntryProto proto) {
+    List<FileInfo> inputFileInfo = proto.getInputFileIntoListList().stream()
+        .map(FileInfo::getFromProtobuf)
+        .collect(Collectors.toList());
+
+    List<FileInfo> outputFileInfo = proto.getOutputFileIntoListList().stream()
+        .map(FileInfo::getFromProtobuf)
+        .collect(Collectors.toList());
+
+
+    return new CompactionLogEntry(proto.getDbSequenceNumber(),
+        inputFileInfo, outputFileInfo);
+  }
+
+  public String toEncodedString() {
+    // Encoding is used to deal with \n. Protobuf appends \n after each
+    // parameter. If ByteArray is simply converted to a string or
+    // protobuf.toString(), it will contain \n and will be added to the log.
+    // Which creates a problem while reading compaction logs.
+    // Compaction log lines are read sequentially assuming each line is one
+    // compaction log entry.
+    return Base64.getEncoder().encodeToString(getProtobuf().toByteArray());
+  }
+
+  public static CompactionLogEntry fromEncodedString(String string) {
+    try {
+      byte[] decode = Base64.getDecoder().decode(string);
+      return getFromProtobuf(CompactionLogEntryProto.parseFrom(decode));
+    } catch (InvalidProtocolBufferException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public String toString() {
+    return String.format("dbSequenceNumber: '%s', inputFileInfoList: '%s'," +
+            " outputFileInfoList: '%s',", dbSequenceNumber, inputFileInfoList,
+        outputFileInfoList);
+  }
+
+  public static CompactionLogEntry fromCompactionFiles(
+      long dbSequenceNumber,
+      List<String> inputFiles,
+      List<String> outputFiles
+  ) {
+
+    try (ManagedOptions options = new ManagedOptions();
+         ManagedReadOptions readOptions = new ManagedReadOptions()) {
+      List<FileInfo> inputFileInfos = convertFileInfo(inputFiles, options,
+          readOptions);
+      List<FileInfo> outputFileInfos = convertFileInfo(outputFiles, options,
+          readOptions);
+      return new CompactionLogEntry(dbSequenceNumber, inputFileInfos,
+          outputFileInfos);
+    }
+  }
+
+  private static List<FileInfo> convertFileInfo(
+      List<String> sstFiles,
+      ManagedOptions options,
+      ManagedReadOptions readOptions
+  ) {
+    if (CollectionUtils.isEmpty(sstFiles)) {
+      return Collections.emptyList();
+    }
+
+    final int fileNameOffset = sstFiles.get(0).lastIndexOf("/") + 1;
+    List<FileInfo> response = new ArrayList<>();
+
+    for (String sstFile : sstFiles) {
+      String fileName = sstFile.substring(fileNameOffset,
+          sstFile.length() - SST_FILE_EXTENSION_LENGTH);
+      SstFileReader fileReader = new SstFileReader(options);
+      try {
+        fileReader.open(sstFile);
+        String columnFamily = StringUtils.bytes2String(
+            fileReader.getTableProperties().getColumnFamilyName());
+        SstFileReaderIterator iterator = fileReader.newIterator(readOptions);

Review Comment:
   onCompactionCompleted function has the rocks db instance. We can use that to 
read it from the manifest file getting liveMetadataFile for sst file.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to