This is an automated email from the ASF dual-hosted git repository.
hemant pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 4788798724 HDDS-7601. Added new columnFamily: compactionLogTable to
store compaction entries (#5303)
4788798724 is described below
commit 47887987244ba42ba6317b9db37a5b6f7c6e24f4
Author: Hemant Kumar <[email protected]>
AuthorDate: Mon Sep 18 18:45:26 2023 -0700
HDDS-7601. Added new columnFamily: compactionLogTable to store compaction
entries (#5303)
---
.../interface-client/src/main/proto/hdds.proto | 17 +-
.../ozone/compaction/log/CompactionFileInfo.java | 164 +++++++++++++++
.../ozone/compaction/log/CompactionLogEntry.java | 193 ++++++++++++++++++
.../apache/ozone/compaction/log/package-info.java | 23 +++
.../ozone/rocksdiff/RocksDBCheckpointDiffer.java | 2 +-
.../compaction/log/TestCompactionFileInfo.java | 220 +++++++++++++++++++++
.../compaction/log/TestCompactionLogEntry.java | 146 ++++++++++++++
.../apache/hadoop/ozone/om/OMMetadataManager.java | 2 +
.../hadoop/ozone/om/OmMetadataManagerImpl.java | 23 ++-
.../hadoop/ozone/om/codec/OMDBDefinition.java | 11 ++
10 files changed, 797 insertions(+), 4 deletions(-)
diff --git a/hadoop-hdds/interface-client/src/main/proto/hdds.proto
b/hadoop-hdds/interface-client/src/main/proto/hdds.proto
index 1479daa1c6..5c20745c06 100644
--- a/hadoop-hdds/interface-client/src/main/proto/hdds.proto
+++ b/hadoop-hdds/interface-client/src/main/proto/hdds.proto
@@ -479,4 +479,19 @@ message DeletedBlocksTransactionInfo {
optional int64 containerID = 2;
repeated int64 localID = 3;
optional int32 count = 4;
-}
\ No newline at end of file
+}
+
+message CompactionFileInfoProto {
+ optional string fileName = 1;
+ optional string startKey = 2;
+ optional string endKey = 3;
+ optional string columnFamily = 4;
+}
+
+message CompactionLogEntryProto {
+ optional uint64 dbSequenceNumber = 1;
+ optional uint64 compactionTime = 2;
+ repeated CompactionFileInfoProto inputFileIntoList = 3;
+ repeated CompactionFileInfoProto outputFileIntoList = 4;
+ optional string compactionReason = 5;
+}
diff --git
a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/compaction/log/CompactionFileInfo.java
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/compaction/log/CompactionFileInfo.java
new file mode 100644
index 0000000000..68a9363b05
--- /dev/null
+++
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/compaction/log/CompactionFileInfo.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ozone.compaction.log;
+
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.util.Preconditions;
+
+import java.util.Objects;
+
+/**
+ * Dao to keep SST file information in the compaction log.
+ */
+public final class CompactionFileInfo {
+ private final String fileName;
+ private final String startKey;
+ private final String endKey;
+ private final String columnFamily;
+
+ private CompactionFileInfo(String fileName,
+ String startRange,
+ String endRange,
+ String columnFamily) {
+ this.fileName = fileName;
+ this.startKey = startRange;
+ this.endKey = endRange;
+ this.columnFamily = columnFamily;
+ }
+
+ public String getFileName() {
+ return fileName;
+ }
+
+ public String getStartKey() {
+ return startKey;
+ }
+
+ public String getEndKey() {
+ return endKey;
+ }
+
+ public String getColumnFamily() {
+ return columnFamily;
+ }
+
+ public HddsProtos.CompactionFileInfoProto getProtobuf() {
+ HddsProtos.CompactionFileInfoProto.Builder builder =
+ HddsProtos.CompactionFileInfoProto.newBuilder()
+ .setFileName(fileName);
+ if (startKey != null) {
+ builder = builder.setStartKey(startKey);
+ }
+ if (endKey != null) {
+ builder = builder.setEndKey(endKey);
+ }
+ if (columnFamily != null) {
+ builder = builder.setColumnFamily(columnFamily);
+ }
+ return builder.build();
+ }
+
+ public static CompactionFileInfo getFromProtobuf(
+ HddsProtos.CompactionFileInfoProto proto) {
+ Builder builder = new Builder(proto.getFileName());
+
+ if (proto.hasStartKey()) {
+ builder.setStartRange(proto.getStartKey());
+ }
+ if (proto.hasEndKey()) {
+ builder.setEndRange(proto.getEndKey());
+ }
+ if (proto.hasColumnFamily()) {
+ builder.setColumnFamily(proto.getColumnFamily());
+ }
+
+ return builder.build();
+ }
+
+ @Override
+ public String toString() {
+ return String.format("fileName: '%s', startKey: '%s', endKey: '%s'," +
+ " columnFamily: '%s'", fileName, startKey, endKey, columnFamily);
+ }
+
+ /**
+ * Builder of CompactionFileInfo.
+ */
+ public static class Builder {
+ private final String fileName;
+ private String startRange;
+ private String endRange;
+ private String columnFamily;
+
+ public Builder(String fileName) {
+ Preconditions.checkNotNull(fileName, "FileName is required parameter.");
+ this.fileName = fileName;
+ }
+
+ public Builder setStartRange(String startRange) {
+ this.startRange = startRange;
+ return this;
+ }
+
+ public Builder setEndRange(String endRange) {
+ this.endRange = endRange;
+ return this;
+ }
+
+ public Builder setColumnFamily(String columnFamily) {
+ this.columnFamily = columnFamily;
+ return this;
+ }
+
+ public CompactionFileInfo build() {
+ if ((startRange != null || endRange != null || columnFamily != null) &&
+ (startRange == null || endRange == null || columnFamily == null)) {
+ throw new IllegalArgumentException(
+ String.format("Either all of startRange, endRange and " +
+ "columnFamily should be non-null or null. " +
+ "startRange: '%s', endRange: '%s', columnFamily: '%s'.",
+ startRange, endRange, columnFamily));
+ }
+
+ return new CompactionFileInfo(fileName, startRange, endRange,
+ columnFamily);
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof CompactionFileInfo)) {
+ return false;
+ }
+
+ CompactionFileInfo that = (CompactionFileInfo) o;
+ return Objects.equals(fileName, that.fileName) &&
+ Objects.equals(startKey, that.startKey) &&
+ Objects.equals(endKey, that.endKey) &&
+ Objects.equals(columnFamily, that.columnFamily);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(fileName, startKey, endKey, columnFamily);
+ }
+}
diff --git
a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/compaction/log/CompactionLogEntry.java
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/compaction/log/CompactionLogEntry.java
new file mode 100644
index 0000000000..41a003515d
--- /dev/null
+++
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/compaction/log/CompactionLogEntry.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ozone.compaction.log;
+
+import
org.apache.hadoop.hdds.protocol.proto.HddsProtos.CompactionLogEntryProto;
+import org.apache.hadoop.hdds.utils.db.Codec;
+import org.apache.hadoop.hdds.utils.db.CopyObject;
+import org.apache.hadoop.hdds.utils.db.DelegatedCodec;
+import org.apache.hadoop.hdds.utils.db.Proto2Codec;
+import org.apache.hadoop.util.Preconditions;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/**
+ * Compaction log entry Dao to write to the compaction log file.
+ */
+public final class CompactionLogEntry implements
+ CopyObject<CompactionLogEntry> {
+ private static final Codec<CompactionLogEntry> CODEC = new DelegatedCodec<>(
+ Proto2Codec.get(CompactionLogEntryProto.class),
+ CompactionLogEntry::getFromProtobuf,
+ CompactionLogEntry::getProtobuf);
+
+ public static Codec<CompactionLogEntry> getCodec() {
+ return CODEC;
+ }
+
+ private final long dbSequenceNumber;
+ private final long compactionTime;
+ private final List<CompactionFileInfo> inputFileInfoList;
+ private final List<CompactionFileInfo> outputFileInfoList;
+ private final String compactionReason;
+
+ private CompactionLogEntry(long dbSequenceNumber,
+ long compactionTime,
+ List<CompactionFileInfo> inputFileInfoList,
+ List<CompactionFileInfo> outputFileInfoList,
+ String compactionReason) {
+ this.dbSequenceNumber = dbSequenceNumber;
+ this.compactionTime = compactionTime;
+ this.inputFileInfoList = inputFileInfoList;
+ this.outputFileInfoList = outputFileInfoList;
+ this.compactionReason = compactionReason;
+ }
+
+ public List<CompactionFileInfo> getInputFileInfoList() {
+ return inputFileInfoList;
+ }
+
+ public List<CompactionFileInfo> getOutputFileInfoList() {
+ return outputFileInfoList;
+ }
+
+ public long getDbSequenceNumber() {
+ return dbSequenceNumber;
+ }
+
+ public long getCompactionTime() {
+ return compactionTime;
+ }
+
+ public String getCompactionReason() {
+ return compactionReason;
+ }
+
+ public CompactionLogEntryProto getProtobuf() {
+ CompactionLogEntryProto.Builder builder = CompactionLogEntryProto
+ .newBuilder()
+ .setDbSequenceNumber(dbSequenceNumber)
+ .setCompactionTime(compactionTime);
+
+ if (compactionReason != null) {
+ builder.setCompactionReason(compactionReason);
+ }
+
+ inputFileInfoList.forEach(fileInfo ->
+ builder.addInputFileIntoList(fileInfo.getProtobuf()));
+
+ outputFileInfoList.forEach(fileInfo ->
+ builder.addOutputFileIntoList(fileInfo.getProtobuf()));
+
+ return builder.build();
+ }
+
+ public static CompactionLogEntry getFromProtobuf(
+ CompactionLogEntryProto proto) {
+ List<CompactionFileInfo> inputFileInfo = proto.getInputFileIntoListList()
+ .stream()
+ .map(CompactionFileInfo::getFromProtobuf)
+ .collect(Collectors.toList());
+
+ List<CompactionFileInfo> outputFileInfo = proto.getOutputFileIntoListList()
+ .stream()
+ .map(CompactionFileInfo::getFromProtobuf)
+ .collect(Collectors.toList());
+ Builder builder = new Builder(proto.getDbSequenceNumber(),
+ proto.getCompactionTime(), inputFileInfo, outputFileInfo);
+
+ if (proto.hasCompactionReason()) {
+ builder.setCompactionReason(proto.getCompactionReason());
+ }
+ return builder.build();
+ }
+
+ @Override
+ public String toString() {
+ return String.format("dbSequenceNumber: '%s', compactionTime: '%s', " +
+ "inputFileInfoList: '%s', outputFileInfoList: '%s', " +
+ "compactionReason: '%s'.", dbSequenceNumber, compactionTime,
+ inputFileInfoList, outputFileInfoList, compactionReason);
+ }
+
+ /**
+ * Builder of CompactionLogEntry.
+ */
+ public static class Builder {
+ private final long dbSequenceNumber;
+ private final long compactionTime;
+ private final List<CompactionFileInfo> inputFileInfoList;
+ private final List<CompactionFileInfo> outputFileInfoList;
+ private String compactionReason;
+
+ public Builder(long dbSequenceNumber, long compactionTime,
+ List<CompactionFileInfo> inputFileInfoList,
+ List<CompactionFileInfo> outputFileInfoList) {
+ Preconditions.checkNotNull(inputFileInfoList,
+ "inputFileInfoList is required parameter.");
+ Preconditions.checkNotNull(outputFileInfoList,
+ "outputFileInfoList is required parameter.");
+ this.dbSequenceNumber = dbSequenceNumber;
+ this.compactionTime = compactionTime;
+ this.inputFileInfoList = inputFileInfoList;
+ this.outputFileInfoList = outputFileInfoList;
+ }
+
+ public Builder setCompactionReason(String compactionReason) {
+ this.compactionReason = compactionReason;
+ return this;
+ }
+
+ public CompactionLogEntry build() {
+ return new CompactionLogEntry(dbSequenceNumber, compactionTime,
+ inputFileInfoList, outputFileInfoList, compactionReason);
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof CompactionLogEntry)) {
+ return false;
+ }
+
+ CompactionLogEntry that = (CompactionLogEntry) o;
+ return dbSequenceNumber == that.dbSequenceNumber &&
+ compactionTime == that.compactionTime &&
+ Objects.equals(inputFileInfoList, that.inputFileInfoList) &&
+ Objects.equals(outputFileInfoList, that.outputFileInfoList) &&
+ Objects.equals(compactionReason, that.compactionReason);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(dbSequenceNumber, compactionTime, inputFileInfoList,
+ outputFileInfoList, compactionReason);
+ }
+
+ @Override
+ public CompactionLogEntry copyObject() {
+ return new CompactionLogEntry(dbSequenceNumber, compactionTime,
+ inputFileInfoList, outputFileInfoList, compactionReason);
+ }
+}
diff --git
a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/compaction/log/package-info.java
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/compaction/log/package-info.java
new file mode 100644
index 0000000000..db93d83966
--- /dev/null
+++
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/compaction/log/package-info.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ozone.compaction.log;
+
+/**
+ * This package contains POJO classes for Compaction information.
+ */
diff --git
a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
index fcc09eb28a..d8fe2d50ab 100644
---
a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
+++
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
@@ -165,7 +165,7 @@ public class RocksDBCheckpointDiffer implements
AutoCloseable,
* to save space.
*/
static final String SST_FILE_EXTENSION = ".sst";
- private static final int SST_FILE_EXTENSION_LENGTH =
+ public static final int SST_FILE_EXTENSION_LENGTH =
SST_FILE_EXTENSION.length();
private static final int LONG_MAX_STR_LEN =
diff --git
a/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/compaction/log/TestCompactionFileInfo.java
b/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/compaction/log/TestCompactionFileInfo.java
new file mode 100644
index 0000000000..6de7330b13
--- /dev/null
+++
b/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/compaction/log/TestCompactionFileInfo.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ozone.compaction.log;
+
+import
org.apache.hadoop.hdds.protocol.proto.HddsProtos.CompactionFileInfoProto;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+
+/**
+ * Test class for CompactionFileInfo.
+ */
+public class TestCompactionFileInfo {
+
+ private static Stream<Arguments> compactionFileInfoValidScenarios() {
+ return Stream.of(
+ Arguments.of("All parameters are present.",
+ "fileName",
+ "startRange",
+ "endRange",
+ "columnFamily"
+ ),
+ Arguments.of("Only fileName is present.",
+ "fileName",
+ null,
+ null,
+ null
+ )
+ );
+ }
+
+ @ParameterizedTest(name = "{0}")
+ @MethodSource("compactionFileInfoValidScenarios")
+ public void testCompactionFileInfoValidScenario(String description,
+ String fileName,
+ String startRange,
+ String endRange,
+ String columnFamily) {
+
+ CompactionFileInfo compactionFileInfo =
+ new CompactionFileInfo.Builder(fileName).setStartRange(startRange)
+ .setEndRange(endRange).setColumnFamily(columnFamily).build();
+ Assertions.assertNotNull(compactionFileInfo);
+ }
+
+ private static Stream<Arguments> compactionFileInfoInvalidScenarios() {
+ return Stream.of(
+ Arguments.of("All parameters are null.",
+ null,
+ null,
+ null,
+ null,
+ "FileName is required parameter."
+ ),
+ Arguments.of("fileName is null.",
+ null,
+ "startRange",
+ "endRange",
+ "columnFamily",
+ "FileName is required parameter."
+ ),
+ Arguments.of("startRange is not present.",
+ "fileName",
+ null,
+ "endRange",
+ "columnFamily",
+ "Either all of startRange, endRange and columnFamily" +
+ " should be non-null or null. startRange: 'null', " +
+ "endRange: 'endRange', columnFamily: 'columnFamily'."
+ ),
+ Arguments.of("endRange is not present.",
+ "fileName",
+ "startRange",
+ null,
+ "columnFamily",
+ "Either all of startRange, endRange and columnFamily" +
+ " should be non-null or null. startRange: 'startRange', " +
+ "endRange: 'null', columnFamily: 'columnFamily'."
+ ),
+ Arguments.of("columnFamily is not present.",
+ "fileName",
+ "startRange",
+ "endRange",
+ null,
+ "Either all of startRange, endRange and columnFamily" +
+ " should be non-null or null. startRange: 'startRange', " +
+ "endRange: 'endRange', columnFamily: 'null'."
+ ),
+ Arguments.of("startRange and endRange are not present.",
+ "fileName",
+ null,
+ null,
+ "columnFamily",
+ "Either all of startRange, endRange and columnFamily" +
+ " should be non-null or null. startRange: 'null', " +
+ "endRange: 'null', columnFamily: 'columnFamily'."
+ ),
+ Arguments.of("endRange and columnFamily are not present.",
+ "fileName",
+ "startRange",
+ null,
+ null,
+ "Either all of startRange, endRange and columnFamily " +
+ "should be non-null or null. startRange: 'startRange', " +
+ "endRange: 'null', columnFamily: 'null'."
+ ),
+ Arguments.of("startRange and columnFamily are not present.",
+ "fileName",
+ null,
+ "endRange",
+ null,
+ "Either all of startRange, endRange and columnFamily" +
+ " should be non-null or null. startRange: 'null', " +
+ "endRange: 'endRange', columnFamily: 'null'."
+ )
+ );
+ }
+
+ @ParameterizedTest(name = "{0}")
+ @MethodSource("compactionFileInfoInvalidScenarios")
+ public void testCompactionFileInfoInvalidScenario(String description,
+ String fileName,
+ String startRange,
+ String endRange,
+ String columnFamily,
+ String expectedMessage) {
+ RuntimeException exception =
Assertions.assertThrows(RuntimeException.class,
+ () -> new
CompactionFileInfo.Builder(fileName).setStartRange(startRange)
+ .setEndRange(endRange).setColumnFamily(columnFamily).build());
+ assertEquals(expectedMessage, exception.getMessage());
+ }
+
+ @ParameterizedTest(name = "{0}")
+ @MethodSource("compactionFileInfoValidScenarios")
+ public void testGetProtobuf(String description,
+ String fileName,
+ String startRange,
+ String endRange,
+ String columnFamily) {
+ CompactionFileInfo compactionFileInfo = new CompactionFileInfo
+ .Builder(fileName)
+ .setStartRange(startRange)
+ .setEndRange(endRange)
+ .setColumnFamily(columnFamily)
+ .build();
+
+ CompactionFileInfoProto protobuf = compactionFileInfo.getProtobuf();
+ assertEquals(fileName, protobuf.getFileName());
+
+ if (startRange != null) {
+ assertEquals(startRange, protobuf.getStartKey());
+ } else {
+ assertFalse(protobuf.hasStartKey());
+ }
+ if (endRange != null) {
+ assertEquals(endRange, protobuf.getEndKey());
+ } else {
+ assertFalse(protobuf.hasEndKey());
+ }
+ if (columnFamily != null) {
+ assertEquals(columnFamily, protobuf.getColumnFamily());
+ } else {
+ assertFalse(protobuf.hasColumnFamily());
+ }
+ }
+
+ @ParameterizedTest(name = "{0}")
+ @MethodSource("compactionFileInfoValidScenarios")
+ public void testFromProtobuf(String description,
+ String fileName,
+ String startRange,
+ String endRange,
+ String columnFamily) {
+ CompactionFileInfoProto.Builder builder = CompactionFileInfoProto
+ .newBuilder()
+ .setFileName(fileName);
+
+ if (startRange != null) {
+ builder = builder.setStartKey(startRange);
+ }
+ if (endRange != null) {
+ builder = builder.setEndKey(endRange);
+ }
+ if (columnFamily != null) {
+ builder = builder.setColumnFamily(columnFamily);
+ }
+
+ CompactionFileInfoProto protobuf = builder.build();
+
+ CompactionFileInfo compactionFileInfo =
+ CompactionFileInfo.getFromProtobuf(protobuf);
+
+ assertEquals(fileName, compactionFileInfo.getFileName());
+ assertEquals(startRange, compactionFileInfo.getStartKey());
+ assertEquals(endRange, compactionFileInfo.getEndKey());
+ assertEquals(columnFamily, compactionFileInfo.getColumnFamily());
+ }
+}
diff --git
a/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/compaction/log/TestCompactionLogEntry.java
b/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/compaction/log/TestCompactionLogEntry.java
new file mode 100644
index 0000000000..bdbdac9403
--- /dev/null
+++
b/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/compaction/log/TestCompactionLogEntry.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ozone.compaction.log;
+
+import
org.apache.hadoop.hdds.protocol.proto.HddsProtos.CompactionLogEntryProto;
+import org.apache.hadoop.util.Time;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+/**
+ * Test class for CompactionLogEntry.
+ */
+public class TestCompactionLogEntry {
+
+ private static Stream<Arguments> compactionLogEntryValidScenarios() {
+ List<CompactionFileInfo> inputFiles = Arrays.asList(
+ new CompactionFileInfo.Builder("inputFileName1").setStartRange("key1")
+ .setEndRange("key5").setColumnFamily("columnFamily").build(),
+ new CompactionFileInfo.Builder("inputFileName2").setStartRange("key6")
+ .setEndRange("key11").setColumnFamily("columnFamily").build(),
+ new CompactionFileInfo.Builder("inputFileName3").setStartRange("key12")
+ .setEndRange("key19").setColumnFamily("columnFamily").build());
+ List<CompactionFileInfo> outputFiles = Arrays.asList(
+ new CompactionFileInfo.Builder("outputFileName1").setStartRange("key1")
+ .setEndRange("key8").setColumnFamily("columnFamily").build(),
+ new CompactionFileInfo.Builder("outputFileName2").setStartRange("key9")
+ .setEndRange("key19").setColumnFamily("columnFamily").build());
+
+ return Stream.of(
+ Arguments.of("With compaction reason.",
+ 1000,
+ Time.now(),
+ inputFiles,
+ outputFiles,
+ "compactionReason"
+ ),
+ Arguments.of("Without compaction reason.",
+ 2000,
+ Time.now(),
+ inputFiles,
+ outputFiles,
+ null
+ )
+ );
+ }
+
+ @ParameterizedTest(name = "{0}")
+ @MethodSource("compactionLogEntryValidScenarios")
+ public void testGetProtobuf(
+ String description,
+ long dbSequenceNumber,
+ long compactionTime,
+ List<CompactionFileInfo> inputFiles,
+ List<CompactionFileInfo> outputFiles,
+ String compactionReason) {
+ CompactionLogEntry.Builder builder = new CompactionLogEntry
+ .Builder(dbSequenceNumber, compactionTime, inputFiles, outputFiles);
+
+ if (compactionReason != null) {
+ builder.setCompactionReason(compactionReason);
+ }
+
+ CompactionLogEntry compactionLogEntry = builder.build();
+ assertNotNull(compactionLogEntry);
+ CompactionLogEntryProto protobuf =
+ compactionLogEntry.getProtobuf();
+ assertEquals(dbSequenceNumber, protobuf.getDbSequenceNumber());
+ assertEquals(compactionTime, protobuf.getCompactionTime());
+ assertEquals(inputFiles, protobuf.getInputFileIntoListList().stream()
+ .map(CompactionFileInfo::getFromProtobuf)
+ .collect(Collectors.toList()));
+ assertEquals(outputFiles, protobuf.getOutputFileIntoListList().stream()
+ .map(CompactionFileInfo::getFromProtobuf)
+ .collect(Collectors.toList()));
+ if (compactionReason != null) {
+ assertEquals(compactionReason, protobuf.getCompactionReason());
+ } else {
+ assertFalse(protobuf.hasCompactionReason());
+ }
+ }
+
+
+ @ParameterizedTest(name = "{0}")
+ @MethodSource("compactionLogEntryValidScenarios")
+ public void testFromProtobuf(
+ String description,
+ long dbSequenceNumber,
+ long compactionTime,
+ List<CompactionFileInfo> inputFiles,
+ List<CompactionFileInfo> outputFiles,
+ String compactionReason) {
+
+ CompactionLogEntryProto.Builder builder = CompactionLogEntryProto
+ .newBuilder()
+ .setDbSequenceNumber(dbSequenceNumber)
+ .setCompactionTime(compactionTime);
+
+ if (compactionReason != null) {
+ builder.setCompactionReason(compactionReason);
+ }
+
+ inputFiles.forEach(fileInfo ->
+ builder.addInputFileIntoList(fileInfo.getProtobuf()));
+
+ outputFiles.forEach(fileInfo ->
+ builder.addOutputFileIntoList(fileInfo.getProtobuf()));
+
+ CompactionLogEntryProto protobuf1 = builder.build();
+
+ CompactionLogEntry compactionLogEntry =
+ CompactionLogEntry.getFromProtobuf(protobuf1);
+
+ assertNotNull(compactionLogEntry);
+ assertEquals(dbSequenceNumber, compactionLogEntry.getDbSequenceNumber());
+ assertEquals(compactionTime, compactionLogEntry.getCompactionTime());
+ assertEquals(inputFiles, compactionLogEntry.getInputFileInfoList());
+ assertEquals(outputFiles, compactionLogEntry.getOutputFileInfoList());
+ assertEquals(compactionReason, compactionLogEntry.getCompactionReason());
+ }
+}
diff --git
a/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
b/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
index 3dcabf088e..0832344dce 100644
---
a/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
+++
b/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
@@ -51,6 +51,7 @@ import org.apache.hadoop.hdds.utils.db.DBStore;
import org.apache.hadoop.hdds.utils.db.Table;
import com.google.common.annotations.VisibleForTesting;
+import org.apache.ozone.compaction.log.CompactionLogEntry;
/**
* OM metadata manager interface.
@@ -378,6 +379,7 @@ public interface OMMetadataManager extends DBStoreHAManager
{
Table<String, String> getSnapshotRenamedTable();
+ Table<String, CompactionLogEntry> getCompactionLogTable();
/**
* Gets the OM Meta table.
* @return meta table reference.
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
index 2aee67e91c..512ff167b6 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
@@ -109,6 +109,7 @@ import static
org.apache.hadoop.ozone.om.service.SnapshotDeletingService.isBlock
import static
org.apache.hadoop.ozone.om.snapshot.SnapshotUtils.checkSnapshotDirExist;
import org.apache.hadoop.util.Time;
+import org.apache.ozone.compaction.log.CompactionLogEntry;
import org.apache.ratis.util.ExitUtils;
import org.eclipse.jetty.util.StringUtil;
import org.slf4j.Logger;
@@ -197,6 +198,8 @@ public class OmMetadataManagerImpl implements
OMMetadataManager,
* | | 2. /volumeId/bucketId/parentId/fileName
|
* | | 3. /volumeName/bucketName/keyName
|
*
|-------------------------------------------------------------------------|
+ * | compactionLogTable | dbTrxId-compactionTime -> compactionLogEntry
|
+ *
|-------------------------------------------------------------------------|
*/
public static final String USER_TABLE = "userTable";
@@ -225,6 +228,8 @@ public class OmMetadataManagerImpl implements
OMMetadataManager,
public static final String SNAPSHOT_INFO_TABLE = "snapshotInfoTable";
public static final String SNAPSHOT_RENAMED_TABLE =
"snapshotRenamedTable";
+ public static final String COMPACTION_LOG_TABLE =
+ "compactionLogTable";
static final String[] ALL_TABLES = new String[] {
USER_TABLE,
@@ -247,7 +252,8 @@ public class OmMetadataManagerImpl implements
OMMetadataManager,
PRINCIPAL_TO_ACCESS_IDS_TABLE,
TENANT_STATE_TABLE,
SNAPSHOT_INFO_TABLE,
- SNAPSHOT_RENAMED_TABLE
+ SNAPSHOT_RENAMED_TABLE,
+ COMPACTION_LOG_TABLE
};
private DBStore store;
@@ -277,6 +283,7 @@ public class OmMetadataManagerImpl implements
OMMetadataManager,
private Table snapshotInfoTable;
private Table snapshotRenamedTable;
+ private Table compactionLogTable;
private boolean isRatisEnabled;
private boolean ignorePipelineinKey;
@@ -615,6 +622,7 @@ public class OmMetadataManagerImpl implements
OMMetadataManager,
.addTable(TENANT_STATE_TABLE)
.addTable(SNAPSHOT_INFO_TABLE)
.addTable(SNAPSHOT_RENAMED_TABLE)
+ .addTable(COMPACTION_LOG_TABLE)
.addCodec(OzoneTokenIdentifier.class, TokenIdentifierCodec.get())
.addCodec(OmKeyInfo.class, OmKeyInfo.getCodec(true))
.addCodec(RepeatedOmKeyInfo.class, RepeatedOmKeyInfo.getCodec(true))
@@ -629,7 +637,8 @@ public class OmMetadataManagerImpl implements
OMMetadataManager,
.addCodec(OmDBTenantState.class, OmDBTenantState.getCodec())
.addCodec(OmDBAccessIdInfo.class, OmDBAccessIdInfo.getCodec())
.addCodec(OmDBUserPrincipalInfo.class,
OmDBUserPrincipalInfo.getCodec())
- .addCodec(SnapshotInfo.class, SnapshotInfo.getCodec());
+ .addCodec(SnapshotInfo.class, SnapshotInfo.getCodec())
+ .addCodec(CompactionLogEntry.class, CompactionLogEntry.getCodec());
}
/**
@@ -742,6 +751,11 @@ public class OmMetadataManagerImpl implements
OMMetadataManager,
checkTableStatus(snapshotRenamedTable, SNAPSHOT_RENAMED_TABLE,
addCacheMetrics);
// TODO: [SNAPSHOT] Initialize table lock for snapshotRenamedTable.
+
+ compactionLogTable = this.store.getTable(COMPACTION_LOG_TABLE,
+ String.class, CompactionLogEntry.class);
+ checkTableStatus(compactionLogTable, COMPACTION_LOG_TABLE,
+ addCacheMetrics);
}
/**
@@ -1942,6 +1956,11 @@ public class OmMetadataManagerImpl implements
OMMetadataManager,
return snapshotRenamedTable;
}
+ @Override
+ public Table<String, CompactionLogEntry> getCompactionLogTable() {
+ return compactionLogTable;
+ }
+
/**
* Get Snapshot Chain Manager.
*
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/codec/OMDBDefinition.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/codec/OMDBDefinition.java
index 0d17faf1fb..056e1b015c 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/codec/OMDBDefinition.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/codec/OMDBDefinition.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.hdds.utils.TransactionInfo;
import org.apache.hadoop.ozone.om.service.SnapshotDeletingService;
import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
import
org.apache.hadoop.ozone.storage.proto.OzoneManagerStorageProtos.PersistedUserVolumeInfo;
+import org.apache.ozone.compaction.log.CompactionLogEntry;
import java.util.Map;
@@ -231,6 +232,15 @@ public class OMDBDefinition extends DBDefinition.WithMap {
SnapshotInfo.class,
SnapshotInfo.getCodec());
+ public static final DBColumnFamilyDefinition<String, CompactionLogEntry>
+ COMPACTION_LOG_TABLE =
+ new DBColumnFamilyDefinition<>(
+ OmMetadataManagerImpl.COMPACTION_LOG_TABLE,
+ String.class,
+ StringCodec.get(),
+ CompactionLogEntry.class,
+ CompactionLogEntry.getCodec());
+
/**
* SnapshotRenamedTable that complements the keyTable (or fileTable)
* and dirTable entries of the immediately previous snapshot in the
@@ -268,6 +278,7 @@ public class OMDBDefinition extends DBDefinition.WithMap {
S3_SECRET_TABLE,
SNAPSHOT_INFO_TABLE,
SNAPSHOT_RENAMED_TABLE,
+ COMPACTION_LOG_TABLE,
TENANT_ACCESS_ID_TABLE,
TENANT_STATE_TABLE,
TRANSACTION_INFO_TABLE,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]