This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
commit ea67c5c4b2065d65b18b14a5a2be0d93735d1fb5 Author: JingsongLi <lzljs3620...@aliyun.com> AuthorDate: Thu Jan 13 17:04:10 2022 +0800 [FLINK-25629] Introduce UniversalCompaction CompactStrategy --- .../table/store/file/mergetree/LevelSortedRun.java | 64 ++++++ .../file/mergetree/compact/CompactStrategy.java | 30 +++ .../store/file/mergetree/compact/CompactUnit.java | 55 +++++ .../mergetree/compact/UniversalCompaction.java | 131 ++++++++++++ .../mergetree/compact/UniversalCompactionTest.java | 228 +++++++++++++++++++++ 5 files changed, 508 insertions(+) diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/LevelSortedRun.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/LevelSortedRun.java new file mode 100644 index 0000000..606cdb9 --- /dev/null +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/LevelSortedRun.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.file.mergetree; + +import java.util.Objects; + +/** {@link SortedRun} with level. */ +public class LevelSortedRun { + + private final int level; + + private final SortedRun run; + + public LevelSortedRun(int level, SortedRun run) { + this.level = level; + this.run = run; + } + + public int level() { + return level; + } + + public SortedRun run() { + return run; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + LevelSortedRun that = (LevelSortedRun) o; + return level == that.level && Objects.equals(run, that.run); + } + + @Override + public int hashCode() { + return Objects.hash(level, run); + } + + @Override + public String toString() { + return "LevelSortedRun{" + "level=" + level + ", run=" + run + '}'; + } +} diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactStrategy.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactStrategy.java new file mode 100644 index 0000000..27e66b6 --- /dev/null +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactStrategy.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.file.mergetree.compact; + +import org.apache.flink.table.store.file.mergetree.LevelSortedRun; + +import java.util.List; +import java.util.Optional; + +/** Compact strategy to decide which files to select for compaction. */ +public interface CompactStrategy { + + Optional<CompactUnit> pick(int numLevels, List<LevelSortedRun> runs); +} diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactUnit.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactUnit.java new file mode 100644 index 0000000..f7d5193 --- /dev/null +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactUnit.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.file.mergetree.compact; + +import org.apache.flink.table.store.file.mergetree.LevelSortedRun; +import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta; + +import java.util.ArrayList; +import java.util.List; + +/** A files unit for compaction. */ +interface CompactUnit { + + int outputLevel(); + + List<SstFileMeta> files(); + + static CompactUnit fromLevelRuns(int outputLevel, List<LevelSortedRun> runs) { + List<SstFileMeta> files = new ArrayList<>(); + for (LevelSortedRun run : runs) { + files.addAll(run.run().files()); + } + return fromFiles(outputLevel, files); + } + + static CompactUnit fromFiles(int outputLevel, List<SstFileMeta> files) { + return new CompactUnit() { + @Override + public int outputLevel() { + return outputLevel; + } + + @Override + public List<SstFileMeta> files() { + return files; + } + }; + } +} diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompaction.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompaction.java new file mode 100644 index 0000000..a590085 --- /dev/null +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompaction.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.file.mergetree.compact; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.table.store.file.mergetree.LevelSortedRun; +import org.apache.flink.table.store.file.mergetree.SortedRun; + +import java.util.List; +import java.util.Optional; + +/** + * Universal Compaction Style is a compaction style, targeting the use cases requiring lower write + * amplification, trading off read amplification and space amplification. + * + * <p>See RocksDb Universal-Compaction: + * https://github.com/facebook/rocksdb/wiki/Universal-Compaction. + */ +public class UniversalCompaction implements CompactStrategy { + + private final int maxSizeAmp; + private final int sizeRatio; + private final int maxRunNum; + + public UniversalCompaction(int maxSizeAmp, int sizeRatio, int maxRunNum) { + this.maxSizeAmp = maxSizeAmp; + this.sizeRatio = sizeRatio; + this.maxRunNum = maxRunNum; + } + + @Override + public Optional<CompactUnit> pick(int numLevels, List<LevelSortedRun> runs) { + int maxLevel = numLevels - 1; + + // 1 checking for reducing size amplification + CompactUnit unit = pickForSizeAmp(maxLevel, runs); + if (unit != null) { + return Optional.of(unit); + } + + // 2 checking for size ratio + unit = pickForSizeRatio(maxLevel, runs); + if (unit != null) { + return Optional.of(unit); + } + + // 3 checking for file num + if (runs.size() > maxRunNum) { + // compacting for file num + return Optional.of(createUnit(runs, maxLevel, runs.size() - maxRunNum + 1)); + } + + return Optional.empty(); + } + + @VisibleForTesting + CompactUnit pickForSizeAmp(int maxLevel, List<LevelSortedRun> runs) { + if (runs.size() < maxRunNum) { + return null; + } + + long candidateSize = + runs.subList(0, runs.size() - 1).stream() + .map(LevelSortedRun::run) + .mapToLong(SortedRun::totalSize) + .sum(); + + long earliestRunSize = runs.get(runs.size() - 1).run().totalSize(); + + // size amplification = percentage of additional size + if (candidateSize * 100 > maxSizeAmp * earliestRunSize) { + return CompactUnit.fromLevelRuns(maxLevel, runs); + } + + return null; + } + + @VisibleForTesting + CompactUnit pickForSizeRatio(int maxLevel, List<LevelSortedRun> runs) { + if (runs.size() < maxRunNum) { + return null; + } + + int candidateCount = 1; + long candidateSize = runs.get(0).run().totalSize(); + + for (int i = 1; i < runs.size(); i++) { + LevelSortedRun next = runs.get(i); + if (candidateSize * (100.0 + sizeRatio) / 100.0 < next.run().totalSize()) { + break; + } + + candidateSize += next.run().totalSize(); + candidateCount++; + } + + if (candidateCount > 1) { + return createUnit(runs, maxLevel, candidateCount); + } + + return null; + } + + @VisibleForTesting + static CompactUnit createUnit(List<LevelSortedRun> runs, int maxLevel, int runCount) { + int outputLevel; + if (runCount == runs.size()) { + outputLevel = maxLevel; + } else { + outputLevel = Math.max(0, runs.get(runCount).level() - 1); + } + + return CompactUnit.fromLevelRuns(outputLevel, runs.subList(0, runCount)); + } +} diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompactionTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompactionTest.java new file mode 100644 index 0000000..17ae39a --- /dev/null +++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompactionTest.java @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.file.mergetree.compact; + +import org.apache.flink.table.store.file.mergetree.LevelSortedRun; +import org.apache.flink.table.store.file.mergetree.SortedRun; +import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; + +import static org.apache.flink.table.store.file.mergetree.compact.UniversalCompaction.createUnit; +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link UniversalCompaction}. */ +public class UniversalCompactionTest { + + @Test + public void testOutputLevel() { + assertThat(createUnit(createLevels(0, 0, 1, 3, 4), 5, 1).outputLevel()).isEqualTo(0); + assertThat(createUnit(createLevels(0, 0, 1, 3, 4), 5, 2).outputLevel()).isEqualTo(0); + assertThat(createUnit(createLevels(0, 0, 1, 3, 4), 5, 3).outputLevel()).isEqualTo(2); + assertThat(createUnit(createLevels(0, 0, 1, 3, 4), 5, 4).outputLevel()).isEqualTo(3); + assertThat(createUnit(createLevels(0, 0, 1, 3, 4), 5, 5).outputLevel()).isEqualTo(5); + } + + @Test + public void testPick() { + UniversalCompaction compaction = new UniversalCompaction(25, 1, 3); + + // by size amplification + Optional<CompactUnit> pick = compaction.pick(3, level0(1, 2, 3, 3)); + assertThat(pick.isPresent()).isTrue(); + long[] results = pick.get().files().stream().mapToLong(SstFileMeta::fileSize).toArray(); + assertThat(results).isEqualTo(new long[] {1, 2, 3, 3}); + + // by size ratio + pick = compaction.pick(3, level0(1, 1, 1, 50)); + assertThat(pick.isPresent()).isTrue(); + results = pick.get().files().stream().mapToLong(SstFileMeta::fileSize).toArray(); + assertThat(results).isEqualTo(new long[] {1, 1, 1}); + + // by file num + pick = compaction.pick(3, level0(1, 2, 3, 50)); + assertThat(pick.isPresent()).isTrue(); + results = pick.get().files().stream().mapToLong(SstFileMeta::fileSize).toArray(); + assertThat(results).isEqualTo(new long[] {1, 2}); + } + + @Test + public void testSizeAmplification() { + UniversalCompaction compaction = new UniversalCompaction(25, 0, 1); + long[] sizes = new long[] {1}; + sizes = appendAndPickForSizeAmp(compaction, sizes); + assertThat(sizes).isEqualTo(new long[] {2}); + sizes = appendAndPickForSizeAmp(compaction, sizes); + assertThat(sizes).isEqualTo(new long[] {3}); + sizes = appendAndPickForSizeAmp(compaction, sizes); + assertThat(sizes).isEqualTo(new long[] {4}); + sizes = appendAndPickForSizeAmp(compaction, sizes); + assertThat(sizes).isEqualTo(new long[] {1, 4}); + sizes = appendAndPickForSizeAmp(compaction, sizes); + assertThat(sizes).isEqualTo(new long[] {6}); + sizes = appendAndPickForSizeAmp(compaction, sizes); + assertThat(sizes).isEqualTo(new long[] {1, 6}); + sizes = appendAndPickForSizeAmp(compaction, sizes); + assertThat(sizes).isEqualTo(new long[] {8}); + sizes = appendAndPickForSizeAmp(compaction, sizes); + assertThat(sizes).isEqualTo(new long[] {1, 8}); + sizes = appendAndPickForSizeAmp(compaction, sizes); + assertThat(sizes).isEqualTo(new long[] {1, 1, 8}); + sizes = appendAndPickForSizeAmp(compaction, sizes); + assertThat(sizes).isEqualTo(new long[] {11}); + sizes = appendAndPickForSizeAmp(compaction, sizes); + assertThat(sizes).isEqualTo(new long[] {1, 11}); + sizes = appendAndPickForSizeAmp(compaction, sizes); + assertThat(sizes).isEqualTo(new long[] {1, 1, 11}); + sizes = appendAndPickForSizeAmp(compaction, sizes); + assertThat(sizes).isEqualTo(new long[] {14}); + sizes = appendAndPickForSizeAmp(compaction, sizes); + assertThat(sizes).isEqualTo(new long[] {1, 14}); + sizes = appendAndPickForSizeAmp(compaction, sizes); + assertThat(sizes).isEqualTo(new long[] {1, 1, 14}); + sizes = appendAndPickForSizeAmp(compaction, sizes); + assertThat(sizes).isEqualTo(new long[] {1, 1, 1, 14}); + sizes = appendAndPickForSizeAmp(compaction, sizes); + assertThat(sizes).isEqualTo(new long[] {18}); + } + + @Test + public void testSizeRatio() { + UniversalCompaction compaction = new UniversalCompaction(25, 1, 5); + long[] sizes = new long[] {1, 1, 1, 1}; + sizes = appendAndPickForSizeRatio(compaction, sizes); + assertThat(sizes).isEqualTo(new long[] {5}); + sizes = appendAndPickForSizeRatio(compaction, sizes); + assertThat(sizes).isEqualTo(new long[] {1, 5}); + sizes = appendAndPickForSizeRatio(compaction, sizes); + assertThat(sizes).isEqualTo(new long[] {1, 1, 5}); + sizes = appendAndPickForSizeRatio(compaction, sizes); + assertThat(sizes).isEqualTo(new long[] {1, 1, 1, 5}); + sizes = appendAndPickForSizeRatio(compaction, sizes); + assertThat(sizes).isEqualTo(new long[] {4, 5}); + sizes = appendAndPickForSizeRatio(compaction, sizes); + assertThat(sizes).isEqualTo(new long[] {1, 4, 5}); + sizes = appendAndPickForSizeRatio(compaction, sizes); + assertThat(sizes).isEqualTo(new long[] {1, 1, 4, 5}); + sizes = appendAndPickForSizeRatio(compaction, sizes); + assertThat(sizes).isEqualTo(new long[] {3, 4, 5}); + sizes = appendAndPickForSizeRatio(compaction, sizes); + assertThat(sizes).isEqualTo(new long[] {1, 3, 4, 5}); + sizes = appendAndPickForSizeRatio(compaction, sizes); + assertThat(sizes).isEqualTo(new long[] {2, 3, 4, 5}); + sizes = appendAndPickForSizeRatio(compaction, sizes); + assertThat(sizes).isEqualTo(new long[] {1, 2, 3, 4, 5}); + sizes = appendAndPickForSizeRatio(compaction, sizes); + assertThat(sizes).isEqualTo(new long[] {16}); + sizes = appendAndPickForSizeRatio(compaction, sizes); + assertThat(sizes).isEqualTo(new long[] {1, 16}); + sizes = appendAndPickForSizeRatio(compaction, sizes); + assertThat(sizes).isEqualTo(new long[] {1, 1, 16}); + sizes = appendAndPickForSizeRatio(compaction, sizes); + assertThat(sizes).isEqualTo(new long[] {1, 1, 1, 16}); + sizes = appendAndPickForSizeRatio(compaction, sizes); + assertThat(sizes).isEqualTo(new long[] {4, 16}); + sizes = appendAndPickForSizeRatio(compaction, sizes); + assertThat(sizes).isEqualTo(new long[] {1, 4, 16}); + sizes = appendAndPickForSizeRatio(compaction, sizes); + assertThat(sizes).isEqualTo(new long[] {1, 1, 4, 16}); + sizes = appendAndPickForSizeRatio(compaction, sizes); + assertThat(sizes).isEqualTo(new long[] {3, 4, 16}); + sizes = appendAndPickForSizeRatio(compaction, sizes); + assertThat(sizes).isEqualTo(new long[] {1, 3, 4, 16}); + sizes = appendAndPickForSizeRatio(compaction, sizes); + assertThat(sizes).isEqualTo(new long[] {2, 3, 4, 16}); + sizes = appendAndPickForSizeRatio(compaction, sizes); + assertThat(sizes).isEqualTo(new long[] {1, 2, 3, 4, 16}); + sizes = appendAndPickForSizeRatio(compaction, sizes); + assertThat(sizes).isEqualTo(new long[] {11, 16}); + } + + @Test + public void testSizeRatioThreshold() { + long[] sizes = new long[] {8, 9, 10}; + assertThat(pickForSizeRatio(new UniversalCompaction(25, 10, 2), sizes)) + .isEqualTo(new long[] {8, 9, 10}); + assertThat(pickForSizeRatio(new UniversalCompaction(25, 20, 2), sizes)) + .isEqualTo(new long[] {27}); + } + + private List<LevelSortedRun> createLevels(int... levels) { + List<LevelSortedRun> runs = new ArrayList<>(); + for (int size : levels) { + runs.add(new LevelSortedRun(size, SortedRun.fromSingle(file(1)))); + } + return runs; + } + + private long[] appendAndPickForSizeAmp(UniversalCompaction compaction, long... sizes) { + sizes = addSize(sizes); + CompactUnit unit = compaction.pickForSizeAmp(3, level0(sizes)); + if (unit != null) { + return new long[] { + unit.files().stream().mapToLong(SstFileMeta::fileSize).reduce(Long::sum).getAsLong() + }; + } + return sizes; + } + + private long[] appendAndPickForSizeRatio(UniversalCompaction compaction, long... sizes) { + return pickForSizeRatio(compaction, addSize(sizes)); + } + + private long[] pickForSizeRatio(UniversalCompaction compaction, long... sizes) { + CompactUnit unit = compaction.pickForSizeRatio(3, level0(sizes)); + if (unit != null) { + List<Long> compact = + unit.files().stream().map(SstFileMeta::fileSize).collect(Collectors.toList()); + List<Long> result = new ArrayList<>(); + for (long size : sizes) { + result.add(size); + } + compact.forEach(result::remove); + result.add(0, compact.stream().reduce(Long::sum).get()); + return result.stream().mapToLong(Long::longValue).toArray(); + } + return sizes; + } + + private long[] addSize(long... sizes) { + long[] newSizes = new long[sizes.length + 1]; + newSizes[0] = 1; + System.arraycopy(sizes, 0, newSizes, 1, sizes.length); + return newSizes; + } + + private List<LevelSortedRun> level0(long... sizes) { + List<LevelSortedRun> runs = new ArrayList<>(); + for (Long size : sizes) { + runs.add(new LevelSortedRun(0, SortedRun.fromSingle(file(size)))); + } + return runs; + } + + private SstFileMeta file(long size) { + return new SstFileMeta("", size, 1, null, null, null, 0, 0, 0); + } +}