This is an automated email from the ASF dual-hosted git repository.

hemant pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 4788798724 HDDS-7601. Added new columnFamily: compactionLogTable to 
store compaction entries (#5303)
4788798724 is described below

commit 47887987244ba42ba6317b9db37a5b6f7c6e24f4
Author: Hemant Kumar <[email protected]>
AuthorDate: Mon Sep 18 18:45:26 2023 -0700

    HDDS-7601. Added new columnFamily: compactionLogTable to store compaction 
entries (#5303)
---
 .../interface-client/src/main/proto/hdds.proto     |  17 +-
 .../ozone/compaction/log/CompactionFileInfo.java   | 164 +++++++++++++++
 .../ozone/compaction/log/CompactionLogEntry.java   | 193 ++++++++++++++++++
 .../apache/ozone/compaction/log/package-info.java  |  23 +++
 .../ozone/rocksdiff/RocksDBCheckpointDiffer.java   |   2 +-
 .../compaction/log/TestCompactionFileInfo.java     | 220 +++++++++++++++++++++
 .../compaction/log/TestCompactionLogEntry.java     | 146 ++++++++++++++
 .../apache/hadoop/ozone/om/OMMetadataManager.java  |   2 +
 .../hadoop/ozone/om/OmMetadataManagerImpl.java     |  23 ++-
 .../hadoop/ozone/om/codec/OMDBDefinition.java      |  11 ++
 10 files changed, 797 insertions(+), 4 deletions(-)

diff --git a/hadoop-hdds/interface-client/src/main/proto/hdds.proto 
b/hadoop-hdds/interface-client/src/main/proto/hdds.proto
index 1479daa1c6..5c20745c06 100644
--- a/hadoop-hdds/interface-client/src/main/proto/hdds.proto
+++ b/hadoop-hdds/interface-client/src/main/proto/hdds.proto
@@ -479,4 +479,19 @@ message DeletedBlocksTransactionInfo {
     optional int64 containerID = 2;
     repeated int64 localID = 3;
     optional int32 count = 4;
-}
\ No newline at end of file
+}
+
+message CompactionFileInfoProto {
+    optional string fileName = 1;
+    optional string startKey = 2;
+    optional string endKey = 3;
+    optional string columnFamily = 4;
+}
+
+message CompactionLogEntryProto {
+    optional uint64 dbSequenceNumber = 1;
+    optional uint64 compactionTime = 2;
+    repeated CompactionFileInfoProto inputFileIntoList = 3;
+    repeated CompactionFileInfoProto outputFileIntoList = 4;
+    optional string compactionReason = 5;
+}
diff --git 
a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/compaction/log/CompactionFileInfo.java
 
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/compaction/log/CompactionFileInfo.java
new file mode 100644
index 0000000000..68a9363b05
--- /dev/null
+++ 
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/compaction/log/CompactionFileInfo.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ozone.compaction.log;
+
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.util.Preconditions;
+
+import java.util.Objects;
+
+/**
+ * Dao to keep SST file information in the compaction log.
+ */
+public final class CompactionFileInfo {
+  private final String fileName;
+  private final String startKey;
+  private final String endKey;
+  private final String columnFamily;
+
+  private CompactionFileInfo(String fileName,
+                             String startRange,
+                             String endRange,
+                             String columnFamily) {
+    this.fileName = fileName;
+    this.startKey = startRange;
+    this.endKey = endRange;
+    this.columnFamily = columnFamily;
+  }
+
+  public String getFileName() {
+    return fileName;
+  }
+
+  public String getStartKey() {
+    return startKey;
+  }
+
+  public String getEndKey() {
+    return endKey;
+  }
+
+  public String getColumnFamily() {
+    return columnFamily;
+  }
+
+  public HddsProtos.CompactionFileInfoProto getProtobuf() {
+    HddsProtos.CompactionFileInfoProto.Builder builder =
+        HddsProtos.CompactionFileInfoProto.newBuilder()
+            .setFileName(fileName);
+    if (startKey != null) {
+      builder = builder.setStartKey(startKey);
+    }
+    if (endKey != null) {
+      builder = builder.setEndKey(endKey);
+    }
+    if (columnFamily != null) {
+      builder = builder.setColumnFamily(columnFamily);
+    }
+    return builder.build();
+  }
+
+  public static CompactionFileInfo getFromProtobuf(
+      HddsProtos.CompactionFileInfoProto proto) {
+    Builder builder = new Builder(proto.getFileName());
+
+    if (proto.hasStartKey()) {
+      builder.setStartRange(proto.getStartKey());
+    }
+    if (proto.hasEndKey()) {
+      builder.setEndRange(proto.getEndKey());
+    }
+    if (proto.hasColumnFamily()) {
+      builder.setColumnFamily(proto.getColumnFamily());
+    }
+
+    return builder.build();
+  }
+
+  @Override
+  public String toString() {
+    return String.format("fileName: '%s', startKey: '%s', endKey: '%s'," +
+        " columnFamily: '%s'", fileName, startKey, endKey, columnFamily);
+  }
+
+  /**
+   * Builder of CompactionFileInfo.
+   */
+  public static class Builder {
+    private final String fileName;
+    private String startRange;
+    private String endRange;
+    private String columnFamily;
+
+    public Builder(String fileName) {
+      Preconditions.checkNotNull(fileName, "FileName is required parameter.");
+      this.fileName = fileName;
+    }
+
+    public Builder setStartRange(String startRange) {
+      this.startRange = startRange;
+      return this;
+    }
+
+    public Builder setEndRange(String endRange) {
+      this.endRange = endRange;
+      return this;
+    }
+
+    public Builder setColumnFamily(String columnFamily) {
+      this.columnFamily = columnFamily;
+      return this;
+    }
+
+    public CompactionFileInfo build() {
+      if ((startRange != null || endRange != null || columnFamily != null) &&
+          (startRange == null || endRange == null || columnFamily == null)) {
+        throw new IllegalArgumentException(
+            String.format("Either all of startRange, endRange and " +
+                    "columnFamily should be non-null or null. " +
+                    "startRange: '%s', endRange: '%s', columnFamily: '%s'.",
+                startRange, endRange, columnFamily));
+      }
+
+      return new CompactionFileInfo(fileName, startRange, endRange,
+          columnFamily);
+    }
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (!(o instanceof CompactionFileInfo)) {
+      return false;
+    }
+
+    CompactionFileInfo that = (CompactionFileInfo) o;
+    return Objects.equals(fileName, that.fileName) &&
+        Objects.equals(startKey, that.startKey) &&
+        Objects.equals(endKey, that.endKey) &&
+        Objects.equals(columnFamily, that.columnFamily);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(fileName, startKey, endKey, columnFamily);
+  }
+}
diff --git 
a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/compaction/log/CompactionLogEntry.java
 
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/compaction/log/CompactionLogEntry.java
new file mode 100644
index 0000000000..41a003515d
--- /dev/null
+++ 
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/compaction/log/CompactionLogEntry.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ozone.compaction.log;
+
+import 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.CompactionLogEntryProto;
+import org.apache.hadoop.hdds.utils.db.Codec;
+import org.apache.hadoop.hdds.utils.db.CopyObject;
+import org.apache.hadoop.hdds.utils.db.DelegatedCodec;
+import org.apache.hadoop.hdds.utils.db.Proto2Codec;
+import org.apache.hadoop.util.Preconditions;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/**
+ * Compaction log entry Dao to write to the compaction log file.
+ */
+public final class CompactionLogEntry implements
+    CopyObject<CompactionLogEntry> {
+  private static final Codec<CompactionLogEntry> CODEC = new DelegatedCodec<>(
+      Proto2Codec.get(CompactionLogEntryProto.class),
+      CompactionLogEntry::getFromProtobuf,
+      CompactionLogEntry::getProtobuf);
+
+  public static Codec<CompactionLogEntry> getCodec() {
+    return CODEC;
+  }
+
+  private final long dbSequenceNumber;
+  private final long compactionTime;
+  private final List<CompactionFileInfo> inputFileInfoList;
+  private final List<CompactionFileInfo> outputFileInfoList;
+  private final String compactionReason;
+
+  private CompactionLogEntry(long dbSequenceNumber,
+                            long compactionTime,
+                            List<CompactionFileInfo> inputFileInfoList,
+                            List<CompactionFileInfo> outputFileInfoList,
+                            String compactionReason) {
+    this.dbSequenceNumber = dbSequenceNumber;
+    this.compactionTime = compactionTime;
+    this.inputFileInfoList = inputFileInfoList;
+    this.outputFileInfoList = outputFileInfoList;
+    this.compactionReason = compactionReason;
+  }
+
+  public List<CompactionFileInfo> getInputFileInfoList() {
+    return inputFileInfoList;
+  }
+
+  public List<CompactionFileInfo> getOutputFileInfoList() {
+    return outputFileInfoList;
+  }
+
+  public long getDbSequenceNumber() {
+    return dbSequenceNumber;
+  }
+
+  public long getCompactionTime() {
+    return compactionTime;
+  }
+
+  public String getCompactionReason() {
+    return compactionReason;
+  }
+
+  public CompactionLogEntryProto getProtobuf() {
+    CompactionLogEntryProto.Builder builder = CompactionLogEntryProto
+        .newBuilder()
+        .setDbSequenceNumber(dbSequenceNumber)
+        .setCompactionTime(compactionTime);
+
+    if (compactionReason != null) {
+      builder.setCompactionReason(compactionReason);
+    }
+
+    inputFileInfoList.forEach(fileInfo ->
+        builder.addInputFileIntoList(fileInfo.getProtobuf()));
+
+    outputFileInfoList.forEach(fileInfo ->
+        builder.addOutputFileIntoList(fileInfo.getProtobuf()));
+
+    return builder.build();
+  }
+
+  public static CompactionLogEntry getFromProtobuf(
+      CompactionLogEntryProto proto) {
+    List<CompactionFileInfo> inputFileInfo = proto.getInputFileIntoListList()
+        .stream()
+        .map(CompactionFileInfo::getFromProtobuf)
+        .collect(Collectors.toList());
+
+    List<CompactionFileInfo> outputFileInfo = proto.getOutputFileIntoListList()
+        .stream()
+        .map(CompactionFileInfo::getFromProtobuf)
+        .collect(Collectors.toList());
+    Builder builder = new Builder(proto.getDbSequenceNumber(),
+        proto.getCompactionTime(), inputFileInfo, outputFileInfo);
+
+    if (proto.hasCompactionReason()) {
+      builder.setCompactionReason(proto.getCompactionReason());
+    }
+    return builder.build();
+  }
+
+  @Override
+  public String toString() {
+    return String.format("dbSequenceNumber: '%s', compactionTime: '%s', " +
+            "inputFileInfoList: '%s', outputFileInfoList: '%s', " +
+            "compactionReason: '%s'.", dbSequenceNumber, compactionTime,
+        inputFileInfoList, outputFileInfoList, compactionReason);
+  }
+
+  /**
+   * Builder of CompactionLogEntry.
+   */
+  public static class Builder {
+    private final long dbSequenceNumber;
+    private final long compactionTime;
+    private final List<CompactionFileInfo> inputFileInfoList;
+    private final List<CompactionFileInfo> outputFileInfoList;
+    private String compactionReason;
+
+    public Builder(long dbSequenceNumber, long compactionTime,
+                   List<CompactionFileInfo> inputFileInfoList,
+                   List<CompactionFileInfo> outputFileInfoList) {
+      Preconditions.checkNotNull(inputFileInfoList,
+          "inputFileInfoList is required parameter.");
+      Preconditions.checkNotNull(outputFileInfoList,
+          "outputFileInfoList is required parameter.");
+      this.dbSequenceNumber = dbSequenceNumber;
+      this.compactionTime = compactionTime;
+      this.inputFileInfoList = inputFileInfoList;
+      this.outputFileInfoList = outputFileInfoList;
+    }
+
+    public Builder setCompactionReason(String compactionReason) {
+      this.compactionReason = compactionReason;
+      return this;
+    }
+
+    public CompactionLogEntry build() {
+      return new CompactionLogEntry(dbSequenceNumber, compactionTime,
+          inputFileInfoList, outputFileInfoList, compactionReason);
+    }
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (!(o instanceof CompactionLogEntry)) {
+      return false;
+    }
+
+    CompactionLogEntry that = (CompactionLogEntry) o;
+    return dbSequenceNumber == that.dbSequenceNumber &&
+        compactionTime == that.compactionTime &&
+        Objects.equals(inputFileInfoList, that.inputFileInfoList) &&
+        Objects.equals(outputFileInfoList, that.outputFileInfoList) &&
+        Objects.equals(compactionReason, that.compactionReason);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(dbSequenceNumber, compactionTime, inputFileInfoList,
+        outputFileInfoList, compactionReason);
+  }
+
+  @Override
+  public CompactionLogEntry copyObject() {
+    return new CompactionLogEntry(dbSequenceNumber, compactionTime,
+        inputFileInfoList, outputFileInfoList, compactionReason);
+  }
+}
diff --git 
a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/compaction/log/package-info.java
 
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/compaction/log/package-info.java
new file mode 100644
index 0000000000..db93d83966
--- /dev/null
+++ 
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/compaction/log/package-info.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ozone.compaction.log;
+
+/**
+ * This package contains POJO classes for Compaction information.
+ */
diff --git 
a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
 
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
index fcc09eb28a..d8fe2d50ab 100644
--- 
a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
+++ 
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
@@ -165,7 +165,7 @@ public class RocksDBCheckpointDiffer implements 
AutoCloseable,
    * to save space.
    */
   static final String SST_FILE_EXTENSION = ".sst";
-  private static final int SST_FILE_EXTENSION_LENGTH =
+  public static final int SST_FILE_EXTENSION_LENGTH =
       SST_FILE_EXTENSION.length();
 
   private static final int LONG_MAX_STR_LEN =
diff --git 
a/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/compaction/log/TestCompactionFileInfo.java
 
b/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/compaction/log/TestCompactionFileInfo.java
new file mode 100644
index 0000000000..6de7330b13
--- /dev/null
+++ 
b/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/compaction/log/TestCompactionFileInfo.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ozone.compaction.log;
+
+import 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.CompactionFileInfoProto;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+
+/**
+ * Test class for CompactionFileInfo.
+ */
+public class TestCompactionFileInfo {
+
+  private static Stream<Arguments> compactionFileInfoValidScenarios() {
+    return Stream.of(
+        Arguments.of("All parameters are present.",
+            "fileName",
+            "startRange",
+            "endRange",
+            "columnFamily"
+        ),
+        Arguments.of("Only fileName is present.",
+            "fileName",
+            null,
+            null,
+            null
+        )
+    );
+  }
+
+  @ParameterizedTest(name = "{0}")
+  @MethodSource("compactionFileInfoValidScenarios")
+  public void testCompactionFileInfoValidScenario(String description,
+                                                  String fileName,
+                                                  String startRange,
+                                                  String endRange,
+                                                  String columnFamily) {
+
+    CompactionFileInfo compactionFileInfo =
+        new CompactionFileInfo.Builder(fileName).setStartRange(startRange)
+            .setEndRange(endRange).setColumnFamily(columnFamily).build();
+    Assertions.assertNotNull(compactionFileInfo);
+  }
+
+  private static Stream<Arguments> compactionFileInfoInvalidScenarios() {
+    return Stream.of(
+        Arguments.of("All parameters are null.",
+            null,
+            null,
+            null,
+            null,
+            "FileName is required parameter."
+        ),
+        Arguments.of("fileName is null.",
+            null,
+            "startRange",
+            "endRange",
+            "columnFamily",
+            "FileName is required parameter."
+        ),
+        Arguments.of("startRange is not present.",
+            "fileName",
+            null,
+            "endRange",
+            "columnFamily",
+            "Either all of startRange, endRange and columnFamily" +
+                " should be non-null or null. startRange: 'null', " +
+                "endRange: 'endRange', columnFamily: 'columnFamily'."
+        ),
+        Arguments.of("endRange is not present.",
+            "fileName",
+            "startRange",
+            null,
+            "columnFamily",
+            "Either all of startRange, endRange and columnFamily" +
+                " should be non-null or null. startRange: 'startRange', " +
+                "endRange: 'null', columnFamily: 'columnFamily'."
+        ),
+        Arguments.of("columnFamily is not present.",
+            "fileName",
+            "startRange",
+            "endRange",
+            null,
+            "Either all of startRange, endRange and columnFamily" +
+                " should be non-null or null. startRange: 'startRange', " +
+                "endRange: 'endRange', columnFamily: 'null'."
+        ),
+        Arguments.of("startRange and endRange are not present.",
+            "fileName",
+            null,
+            null,
+            "columnFamily",
+            "Either all of startRange, endRange and columnFamily" +
+                " should be non-null or null. startRange: 'null', " +
+                "endRange: 'null', columnFamily: 'columnFamily'."
+        ),
+        Arguments.of("endRange and columnFamily are not present.",
+            "fileName",
+            "startRange",
+            null,
+            null,
+            "Either all of startRange, endRange and columnFamily " +
+                "should be non-null or null. startRange: 'startRange', " +
+                "endRange: 'null', columnFamily: 'null'."
+        ),
+        Arguments.of("startRange and columnFamily are not present.",
+            "fileName",
+            null,
+            "endRange",
+            null,
+            "Either all of startRange, endRange and columnFamily" +
+                " should be non-null or null. startRange: 'null', " +
+                "endRange: 'endRange', columnFamily: 'null'."
+        )
+    );
+  }
+
+  @ParameterizedTest(name = "{0}")
+  @MethodSource("compactionFileInfoInvalidScenarios")
+  public void testCompactionFileInfoInvalidScenario(String description,
+                                                    String fileName,
+                                                    String startRange,
+                                                    String endRange,
+                                                    String columnFamily,
+                                                    String expectedMessage) {
+    RuntimeException exception = 
Assertions.assertThrows(RuntimeException.class,
+        () -> new 
CompactionFileInfo.Builder(fileName).setStartRange(startRange)
+            .setEndRange(endRange).setColumnFamily(columnFamily).build());
+    assertEquals(expectedMessage, exception.getMessage());
+  }
+
+  @ParameterizedTest(name = "{0}")
+  @MethodSource("compactionFileInfoValidScenarios")
+  public void testGetProtobuf(String description,
+                              String fileName,
+                              String startRange,
+                              String endRange,
+                              String columnFamily) {
+    CompactionFileInfo compactionFileInfo = new CompactionFileInfo
+        .Builder(fileName)
+        .setStartRange(startRange)
+        .setEndRange(endRange)
+        .setColumnFamily(columnFamily)
+        .build();
+
+    CompactionFileInfoProto protobuf = compactionFileInfo.getProtobuf();
+    assertEquals(fileName, protobuf.getFileName());
+
+    if (startRange != null) {
+      assertEquals(startRange, protobuf.getStartKey());
+    } else {
+      assertFalse(protobuf.hasStartKey());
+    }
+    if (endRange != null) {
+      assertEquals(endRange, protobuf.getEndKey());
+    } else {
+      assertFalse(protobuf.hasEndKey());
+    }
+    if (columnFamily != null) {
+      assertEquals(columnFamily, protobuf.getColumnFamily());
+    } else {
+      assertFalse(protobuf.hasColumnFamily());
+    }
+  }
+
+  @ParameterizedTest(name = "{0}")
+  @MethodSource("compactionFileInfoValidScenarios")
+  public void testFromProtobuf(String description,
+                               String fileName,
+                               String startRange,
+                               String endRange,
+                               String columnFamily) {
+    CompactionFileInfoProto.Builder builder = CompactionFileInfoProto
+        .newBuilder()
+        .setFileName(fileName);
+
+    if (startRange != null) {
+      builder = builder.setStartKey(startRange);
+    }
+    if (endRange != null) {
+      builder = builder.setEndKey(endRange);
+    }
+    if (columnFamily != null) {
+      builder = builder.setColumnFamily(columnFamily);
+    }
+
+    CompactionFileInfoProto protobuf = builder.build();
+
+    CompactionFileInfo compactionFileInfo =
+        CompactionFileInfo.getFromProtobuf(protobuf);
+
+    assertEquals(fileName, compactionFileInfo.getFileName());
+    assertEquals(startRange, compactionFileInfo.getStartKey());
+    assertEquals(endRange, compactionFileInfo.getEndKey());
+    assertEquals(columnFamily, compactionFileInfo.getColumnFamily());
+  }
+}
diff --git 
a/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/compaction/log/TestCompactionLogEntry.java
 
b/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/compaction/log/TestCompactionLogEntry.java
new file mode 100644
index 0000000000..bdbdac9403
--- /dev/null
+++ 
b/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/compaction/log/TestCompactionLogEntry.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ozone.compaction.log;
+
+import 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.CompactionLogEntryProto;
+import org.apache.hadoop.util.Time;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+/**
+ * Test class for CompactionLogEntry.
+ */
+public class TestCompactionLogEntry {
+
+  private static Stream<Arguments> compactionLogEntryValidScenarios() {
+    List<CompactionFileInfo> inputFiles = Arrays.asList(
+        new CompactionFileInfo.Builder("inputFileName1").setStartRange("key1")
+            .setEndRange("key5").setColumnFamily("columnFamily").build(),
+        new CompactionFileInfo.Builder("inputFileName2").setStartRange("key6")
+            .setEndRange("key11").setColumnFamily("columnFamily").build(),
+        new CompactionFileInfo.Builder("inputFileName3").setStartRange("key12")
+            .setEndRange("key19").setColumnFamily("columnFamily").build());
+    List<CompactionFileInfo> outputFiles = Arrays.asList(
+        new CompactionFileInfo.Builder("outputFileName1").setStartRange("key1")
+            .setEndRange("key8").setColumnFamily("columnFamily").build(),
+        new CompactionFileInfo.Builder("outputFileName2").setStartRange("key9")
+            .setEndRange("key19").setColumnFamily("columnFamily").build());
+
+    return Stream.of(
+        Arguments.of("With compaction reason.",
+            1000,
+            Time.now(),
+            inputFiles,
+            outputFiles,
+            "compactionReason"
+        ),
+        Arguments.of("Without compaction reason.",
+            2000,
+            Time.now(),
+            inputFiles,
+            outputFiles,
+            null
+        )
+    );
+  }
+
+  @ParameterizedTest(name = "{0}")
+  @MethodSource("compactionLogEntryValidScenarios")
+  public void testGetProtobuf(
+      String description,
+      long dbSequenceNumber,
+      long compactionTime,
+      List<CompactionFileInfo> inputFiles,
+      List<CompactionFileInfo> outputFiles,
+      String compactionReason) {
+    CompactionLogEntry.Builder builder = new CompactionLogEntry
+        .Builder(dbSequenceNumber, compactionTime, inputFiles, outputFiles);
+
+    if (compactionReason != null) {
+      builder.setCompactionReason(compactionReason);
+    }
+
+    CompactionLogEntry compactionLogEntry = builder.build();
+    assertNotNull(compactionLogEntry);
+    CompactionLogEntryProto protobuf =
+        compactionLogEntry.getProtobuf();
+    assertEquals(dbSequenceNumber, protobuf.getDbSequenceNumber());
+    assertEquals(compactionTime, protobuf.getCompactionTime());
+    assertEquals(inputFiles, protobuf.getInputFileIntoListList().stream()
+        .map(CompactionFileInfo::getFromProtobuf)
+        .collect(Collectors.toList()));
+    assertEquals(outputFiles, protobuf.getOutputFileIntoListList().stream()
+        .map(CompactionFileInfo::getFromProtobuf)
+        .collect(Collectors.toList()));
+    if (compactionReason != null) {
+      assertEquals(compactionReason, protobuf.getCompactionReason());
+    } else {
+      assertFalse(protobuf.hasCompactionReason());
+    }
+  }
+
+
+  @ParameterizedTest(name = "{0}")
+  @MethodSource("compactionLogEntryValidScenarios")
+  public void testFromProtobuf(
+      String description,
+      long dbSequenceNumber,
+      long compactionTime,
+      List<CompactionFileInfo> inputFiles,
+      List<CompactionFileInfo> outputFiles,
+      String compactionReason) {
+
+    CompactionLogEntryProto.Builder builder = CompactionLogEntryProto
+        .newBuilder()
+        .setDbSequenceNumber(dbSequenceNumber)
+        .setCompactionTime(compactionTime);
+
+    if (compactionReason != null) {
+      builder.setCompactionReason(compactionReason);
+    }
+
+    inputFiles.forEach(fileInfo ->
+        builder.addInputFileIntoList(fileInfo.getProtobuf()));
+
+    outputFiles.forEach(fileInfo ->
+        builder.addOutputFileIntoList(fileInfo.getProtobuf()));
+
+    CompactionLogEntryProto protobuf1 = builder.build();
+
+    CompactionLogEntry compactionLogEntry =
+        CompactionLogEntry.getFromProtobuf(protobuf1);
+
+    assertNotNull(compactionLogEntry);
+    assertEquals(dbSequenceNumber, compactionLogEntry.getDbSequenceNumber());
+    assertEquals(compactionTime, compactionLogEntry.getCompactionTime());
+    assertEquals(inputFiles, compactionLogEntry.getInputFileInfoList());
+    assertEquals(outputFiles, compactionLogEntry.getOutputFileInfoList());
+    assertEquals(compactionReason, compactionLogEntry.getCompactionReason());
+  }
+}
diff --git 
a/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
 
b/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
index 3dcabf088e..0832344dce 100644
--- 
a/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
+++ 
b/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
@@ -51,6 +51,7 @@ import org.apache.hadoop.hdds.utils.db.DBStore;
 import org.apache.hadoop.hdds.utils.db.Table;
 
 import com.google.common.annotations.VisibleForTesting;
+import org.apache.ozone.compaction.log.CompactionLogEntry;
 
 /**
  * OM metadata manager interface.
@@ -378,6 +379,7 @@ public interface OMMetadataManager extends DBStoreHAManager 
{
 
   Table<String, String> getSnapshotRenamedTable();
 
+  Table<String, CompactionLogEntry> getCompactionLogTable();
   /**
    * Gets the OM Meta table.
    * @return meta table reference.
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
index 2aee67e91c..512ff167b6 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
@@ -109,6 +109,7 @@ import static 
org.apache.hadoop.ozone.om.service.SnapshotDeletingService.isBlock
 import static 
org.apache.hadoop.ozone.om.snapshot.SnapshotUtils.checkSnapshotDirExist;
 
 import org.apache.hadoop.util.Time;
+import org.apache.ozone.compaction.log.CompactionLogEntry;
 import org.apache.ratis.util.ExitUtils;
 import org.eclipse.jetty.util.StringUtil;
 import org.slf4j.Logger;
@@ -197,6 +198,8 @@ public class OmMetadataManagerImpl implements 
OMMetadataManager,
    * |                       |  2. /volumeId/bucketId/parentId/fileName        
|
    * |                       |  3. /volumeName/bucketName/keyName              
|
    * 
|-------------------------------------------------------------------------|
+   * | compactionLogTable    | dbTrxId-compactionTime -> compactionLogEntry    
|
+   * 
|-------------------------------------------------------------------------|
    */
 
   public static final String USER_TABLE = "userTable";
@@ -225,6 +228,8 @@ public class OmMetadataManagerImpl implements 
OMMetadataManager,
   public static final String SNAPSHOT_INFO_TABLE = "snapshotInfoTable";
   public static final String SNAPSHOT_RENAMED_TABLE =
       "snapshotRenamedTable";
+  public static final String COMPACTION_LOG_TABLE =
+      "compactionLogTable";
 
   static final String[] ALL_TABLES = new String[] {
       USER_TABLE,
@@ -247,7 +252,8 @@ public class OmMetadataManagerImpl implements 
OMMetadataManager,
       PRINCIPAL_TO_ACCESS_IDS_TABLE,
       TENANT_STATE_TABLE,
       SNAPSHOT_INFO_TABLE,
-      SNAPSHOT_RENAMED_TABLE
+      SNAPSHOT_RENAMED_TABLE,
+      COMPACTION_LOG_TABLE
   };
 
   private DBStore store;
@@ -277,6 +283,7 @@ public class OmMetadataManagerImpl implements 
OMMetadataManager,
 
   private Table snapshotInfoTable;
   private Table snapshotRenamedTable;
+  private Table compactionLogTable;
 
   private boolean isRatisEnabled;
   private boolean ignorePipelineinKey;
@@ -615,6 +622,7 @@ public class OmMetadataManagerImpl implements 
OMMetadataManager,
         .addTable(TENANT_STATE_TABLE)
         .addTable(SNAPSHOT_INFO_TABLE)
         .addTable(SNAPSHOT_RENAMED_TABLE)
+        .addTable(COMPACTION_LOG_TABLE)
         .addCodec(OzoneTokenIdentifier.class, TokenIdentifierCodec.get())
         .addCodec(OmKeyInfo.class, OmKeyInfo.getCodec(true))
         .addCodec(RepeatedOmKeyInfo.class, RepeatedOmKeyInfo.getCodec(true))
@@ -629,7 +637,8 @@ public class OmMetadataManagerImpl implements 
OMMetadataManager,
         .addCodec(OmDBTenantState.class, OmDBTenantState.getCodec())
         .addCodec(OmDBAccessIdInfo.class, OmDBAccessIdInfo.getCodec())
         .addCodec(OmDBUserPrincipalInfo.class, 
OmDBUserPrincipalInfo.getCodec())
-        .addCodec(SnapshotInfo.class, SnapshotInfo.getCodec());
+        .addCodec(SnapshotInfo.class, SnapshotInfo.getCodec())
+        .addCodec(CompactionLogEntry.class, CompactionLogEntry.getCodec());
   }
 
   /**
@@ -742,6 +751,11 @@ public class OmMetadataManagerImpl implements 
OMMetadataManager,
     checkTableStatus(snapshotRenamedTable, SNAPSHOT_RENAMED_TABLE,
         addCacheMetrics);
     // TODO: [SNAPSHOT] Initialize table lock for snapshotRenamedTable.
+
+    compactionLogTable = this.store.getTable(COMPACTION_LOG_TABLE,
+        String.class, CompactionLogEntry.class);
+    checkTableStatus(compactionLogTable, COMPACTION_LOG_TABLE,
+        addCacheMetrics);
   }
 
   /**
@@ -1942,6 +1956,11 @@ public class OmMetadataManagerImpl implements 
OMMetadataManager,
     return snapshotRenamedTable;
   }
 
+  @Override
+  public Table<String, CompactionLogEntry> getCompactionLogTable() {
+    return compactionLogTable;
+  }
+
   /**
    * Get Snapshot Chain Manager.
    *
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/codec/OMDBDefinition.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/codec/OMDBDefinition.java
index 0d17faf1fb..056e1b015c 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/codec/OMDBDefinition.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/codec/OMDBDefinition.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.hdds.utils.TransactionInfo;
 import org.apache.hadoop.ozone.om.service.SnapshotDeletingService;
 import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
 import 
org.apache.hadoop.ozone.storage.proto.OzoneManagerStorageProtos.PersistedUserVolumeInfo;
+import org.apache.ozone.compaction.log.CompactionLogEntry;
 
 import java.util.Map;
 
@@ -231,6 +232,15 @@ public class OMDBDefinition extends DBDefinition.WithMap {
           SnapshotInfo.class,
           SnapshotInfo.getCodec());
 
+  public static final DBColumnFamilyDefinition<String, CompactionLogEntry>
+      COMPACTION_LOG_TABLE =
+      new DBColumnFamilyDefinition<>(
+          OmMetadataManagerImpl.COMPACTION_LOG_TABLE,
+          String.class,
+          StringCodec.get(),
+          CompactionLogEntry.class,
+          CompactionLogEntry.getCodec());
+
   /**
    * SnapshotRenamedTable that complements the keyTable (or fileTable)
    * and dirTable entries of the immediately previous snapshot in the
@@ -268,6 +278,7 @@ public class OMDBDefinition extends DBDefinition.WithMap {
           S3_SECRET_TABLE,
           SNAPSHOT_INFO_TABLE,
           SNAPSHOT_RENAMED_TABLE,
+          COMPACTION_LOG_TABLE,
           TENANT_ACCESS_ID_TABLE,
           TENANT_STATE_TABLE,
           TRANSACTION_INFO_TABLE,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to