This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git

commit ea67c5c4b2065d65b18b14a5a2be0d93735d1fb5
Author: JingsongLi <lzljs3620...@aliyun.com>
AuthorDate: Thu Jan 13 17:04:10 2022 +0800

    [FLINK-25629] Introduce UniversalCompaction CompactStrategy
---
 .../table/store/file/mergetree/LevelSortedRun.java |  64 ++++++
 .../file/mergetree/compact/CompactStrategy.java    |  30 +++
 .../store/file/mergetree/compact/CompactUnit.java  |  55 +++++
 .../mergetree/compact/UniversalCompaction.java     | 131 ++++++++++++
 .../mergetree/compact/UniversalCompactionTest.java | 228 +++++++++++++++++++++
 5 files changed, 508 insertions(+)

diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/LevelSortedRun.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/LevelSortedRun.java
new file mode 100644
index 0000000..606cdb9
--- /dev/null
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/LevelSortedRun.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree;
+
+import java.util.Objects;
+
+/** {@link SortedRun} with level. */
+public class LevelSortedRun {
+
+    private final int level;
+
+    private final SortedRun run;
+
+    public LevelSortedRun(int level, SortedRun run) {
+        this.level = level;
+        this.run = run;
+    }
+
+    public int level() {
+        return level;
+    }
+
+    public SortedRun run() {
+        return run;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        LevelSortedRun that = (LevelSortedRun) o;
+        return level == that.level && Objects.equals(run, that.run);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(level, run);
+    }
+
+    @Override
+    public String toString() {
+        return "LevelSortedRun{" + "level=" + level + ", run=" + run + '}';
+    }
+}
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactStrategy.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactStrategy.java
new file mode 100644
index 0000000..27e66b6
--- /dev/null
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactStrategy.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact;
+
+import org.apache.flink.table.store.file.mergetree.LevelSortedRun;
+
+import java.util.List;
+import java.util.Optional;
+
+/** Compact strategy to decide which files to select for compaction. */
+public interface CompactStrategy {
+
+    Optional<CompactUnit> pick(int numLevels, List<LevelSortedRun> runs);
+}
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactUnit.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactUnit.java
new file mode 100644
index 0000000..f7d5193
--- /dev/null
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactUnit.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact;
+
+import org.apache.flink.table.store.file.mergetree.LevelSortedRun;
+import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/** A files unit for compaction. */
+interface CompactUnit {
+
+    int outputLevel();
+
+    List<SstFileMeta> files();
+
+    static CompactUnit fromLevelRuns(int outputLevel, List<LevelSortedRun> 
runs) {
+        List<SstFileMeta> files = new ArrayList<>();
+        for (LevelSortedRun run : runs) {
+            files.addAll(run.run().files());
+        }
+        return fromFiles(outputLevel, files);
+    }
+
+    static CompactUnit fromFiles(int outputLevel, List<SstFileMeta> files) {
+        return new CompactUnit() {
+            @Override
+            public int outputLevel() {
+                return outputLevel;
+            }
+
+            @Override
+            public List<SstFileMeta> files() {
+                return files;
+            }
+        };
+    }
+}
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompaction.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompaction.java
new file mode 100644
index 0000000..a590085
--- /dev/null
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompaction.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.table.store.file.mergetree.LevelSortedRun;
+import org.apache.flink.table.store.file.mergetree.SortedRun;
+
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * Universal Compaction Style is a compaction style, targeting the use cases 
requiring lower write
+ * amplification, trading off read amplification and space amplification.
+ *
+ * <p>See RocksDb Universal-Compaction:
+ * https://github.com/facebook/rocksdb/wiki/Universal-Compaction.
+ */
+public class UniversalCompaction implements CompactStrategy {
+
+    private final int maxSizeAmp;
+    private final int sizeRatio;
+    private final int maxRunNum;
+
+    public UniversalCompaction(int maxSizeAmp, int sizeRatio, int maxRunNum) {
+        this.maxSizeAmp = maxSizeAmp;
+        this.sizeRatio = sizeRatio;
+        this.maxRunNum = maxRunNum;
+    }
+
+    @Override
+    public Optional<CompactUnit> pick(int numLevels, List<LevelSortedRun> 
runs) {
+        int maxLevel = numLevels - 1;
+
+        // 1 checking for reducing size amplification
+        CompactUnit unit = pickForSizeAmp(maxLevel, runs);
+        if (unit != null) {
+            return Optional.of(unit);
+        }
+
+        // 2 checking for size ratio
+        unit = pickForSizeRatio(maxLevel, runs);
+        if (unit != null) {
+            return Optional.of(unit);
+        }
+
+        // 3 checking for file num
+        if (runs.size() > maxRunNum) {
+            // compacting for file num
+            return Optional.of(createUnit(runs, maxLevel, runs.size() - 
maxRunNum + 1));
+        }
+
+        return Optional.empty();
+    }
+
+    @VisibleForTesting
+    CompactUnit pickForSizeAmp(int maxLevel, List<LevelSortedRun> runs) {
+        if (runs.size() < maxRunNum) {
+            return null;
+        }
+
+        long candidateSize =
+                runs.subList(0, runs.size() - 1).stream()
+                        .map(LevelSortedRun::run)
+                        .mapToLong(SortedRun::totalSize)
+                        .sum();
+
+        long earliestRunSize = runs.get(runs.size() - 1).run().totalSize();
+
+        // size amplification = percentage of additional size
+        if (candidateSize * 100 > maxSizeAmp * earliestRunSize) {
+            return CompactUnit.fromLevelRuns(maxLevel, runs);
+        }
+
+        return null;
+    }
+
+    @VisibleForTesting
+    CompactUnit pickForSizeRatio(int maxLevel, List<LevelSortedRun> runs) {
+        if (runs.size() < maxRunNum) {
+            return null;
+        }
+
+        int candidateCount = 1;
+        long candidateSize = runs.get(0).run().totalSize();
+
+        for (int i = 1; i < runs.size(); i++) {
+            LevelSortedRun next = runs.get(i);
+            if (candidateSize * (100.0 + sizeRatio) / 100.0 < 
next.run().totalSize()) {
+                break;
+            }
+
+            candidateSize += next.run().totalSize();
+            candidateCount++;
+        }
+
+        if (candidateCount > 1) {
+            return createUnit(runs, maxLevel, candidateCount);
+        }
+
+        return null;
+    }
+
+    @VisibleForTesting
+    static CompactUnit createUnit(List<LevelSortedRun> runs, int maxLevel, int 
runCount) {
+        int outputLevel;
+        if (runCount == runs.size()) {
+            outputLevel = maxLevel;
+        } else {
+            outputLevel = Math.max(0, runs.get(runCount).level() - 1);
+        }
+
+        return CompactUnit.fromLevelRuns(outputLevel, runs.subList(0, 
runCount));
+    }
+}
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompactionTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompactionTest.java
new file mode 100644
index 0000000..17ae39a
--- /dev/null
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompactionTest.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact;
+
+import org.apache.flink.table.store.file.mergetree.LevelSortedRun;
+import org.apache.flink.table.store.file.mergetree.SortedRun;
+import org.apache.flink.table.store.file.mergetree.sst.SstFileMeta;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.table.store.file.mergetree.compact.UniversalCompaction.createUnit;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link UniversalCompaction}. */
+public class UniversalCompactionTest {
+
+    @Test
+    public void testOutputLevel() {
+        assertThat(createUnit(createLevels(0, 0, 1, 3, 4), 5, 
1).outputLevel()).isEqualTo(0);
+        assertThat(createUnit(createLevels(0, 0, 1, 3, 4), 5, 
2).outputLevel()).isEqualTo(0);
+        assertThat(createUnit(createLevels(0, 0, 1, 3, 4), 5, 
3).outputLevel()).isEqualTo(2);
+        assertThat(createUnit(createLevels(0, 0, 1, 3, 4), 5, 
4).outputLevel()).isEqualTo(3);
+        assertThat(createUnit(createLevels(0, 0, 1, 3, 4), 5, 
5).outputLevel()).isEqualTo(5);
+    }
+
+    @Test
+    public void testPick() {
+        UniversalCompaction compaction = new UniversalCompaction(25, 1, 3);
+
+        // by size amplification
+        Optional<CompactUnit> pick = compaction.pick(3, level0(1, 2, 3, 3));
+        assertThat(pick.isPresent()).isTrue();
+        long[] results = 
pick.get().files().stream().mapToLong(SstFileMeta::fileSize).toArray();
+        assertThat(results).isEqualTo(new long[] {1, 2, 3, 3});
+
+        // by size ratio
+        pick = compaction.pick(3, level0(1, 1, 1, 50));
+        assertThat(pick.isPresent()).isTrue();
+        results = 
pick.get().files().stream().mapToLong(SstFileMeta::fileSize).toArray();
+        assertThat(results).isEqualTo(new long[] {1, 1, 1});
+
+        // by file num
+        pick = compaction.pick(3, level0(1, 2, 3, 50));
+        assertThat(pick.isPresent()).isTrue();
+        results = 
pick.get().files().stream().mapToLong(SstFileMeta::fileSize).toArray();
+        assertThat(results).isEqualTo(new long[] {1, 2});
+    }
+
+    @Test
+    public void testSizeAmplification() {
+        UniversalCompaction compaction = new UniversalCompaction(25, 0, 1);
+        long[] sizes = new long[] {1};
+        sizes = appendAndPickForSizeAmp(compaction, sizes);
+        assertThat(sizes).isEqualTo(new long[] {2});
+        sizes = appendAndPickForSizeAmp(compaction, sizes);
+        assertThat(sizes).isEqualTo(new long[] {3});
+        sizes = appendAndPickForSizeAmp(compaction, sizes);
+        assertThat(sizes).isEqualTo(new long[] {4});
+        sizes = appendAndPickForSizeAmp(compaction, sizes);
+        assertThat(sizes).isEqualTo(new long[] {1, 4});
+        sizes = appendAndPickForSizeAmp(compaction, sizes);
+        assertThat(sizes).isEqualTo(new long[] {6});
+        sizes = appendAndPickForSizeAmp(compaction, sizes);
+        assertThat(sizes).isEqualTo(new long[] {1, 6});
+        sizes = appendAndPickForSizeAmp(compaction, sizes);
+        assertThat(sizes).isEqualTo(new long[] {8});
+        sizes = appendAndPickForSizeAmp(compaction, sizes);
+        assertThat(sizes).isEqualTo(new long[] {1, 8});
+        sizes = appendAndPickForSizeAmp(compaction, sizes);
+        assertThat(sizes).isEqualTo(new long[] {1, 1, 8});
+        sizes = appendAndPickForSizeAmp(compaction, sizes);
+        assertThat(sizes).isEqualTo(new long[] {11});
+        sizes = appendAndPickForSizeAmp(compaction, sizes);
+        assertThat(sizes).isEqualTo(new long[] {1, 11});
+        sizes = appendAndPickForSizeAmp(compaction, sizes);
+        assertThat(sizes).isEqualTo(new long[] {1, 1, 11});
+        sizes = appendAndPickForSizeAmp(compaction, sizes);
+        assertThat(sizes).isEqualTo(new long[] {14});
+        sizes = appendAndPickForSizeAmp(compaction, sizes);
+        assertThat(sizes).isEqualTo(new long[] {1, 14});
+        sizes = appendAndPickForSizeAmp(compaction, sizes);
+        assertThat(sizes).isEqualTo(new long[] {1, 1, 14});
+        sizes = appendAndPickForSizeAmp(compaction, sizes);
+        assertThat(sizes).isEqualTo(new long[] {1, 1, 1, 14});
+        sizes = appendAndPickForSizeAmp(compaction, sizes);
+        assertThat(sizes).isEqualTo(new long[] {18});
+    }
+
+    @Test
+    public void testSizeRatio() {
+        UniversalCompaction compaction = new UniversalCompaction(25, 1, 5);
+        long[] sizes = new long[] {1, 1, 1, 1};
+        sizes = appendAndPickForSizeRatio(compaction, sizes);
+        assertThat(sizes).isEqualTo(new long[] {5});
+        sizes = appendAndPickForSizeRatio(compaction, sizes);
+        assertThat(sizes).isEqualTo(new long[] {1, 5});
+        sizes = appendAndPickForSizeRatio(compaction, sizes);
+        assertThat(sizes).isEqualTo(new long[] {1, 1, 5});
+        sizes = appendAndPickForSizeRatio(compaction, sizes);
+        assertThat(sizes).isEqualTo(new long[] {1, 1, 1, 5});
+        sizes = appendAndPickForSizeRatio(compaction, sizes);
+        assertThat(sizes).isEqualTo(new long[] {4, 5});
+        sizes = appendAndPickForSizeRatio(compaction, sizes);
+        assertThat(sizes).isEqualTo(new long[] {1, 4, 5});
+        sizes = appendAndPickForSizeRatio(compaction, sizes);
+        assertThat(sizes).isEqualTo(new long[] {1, 1, 4, 5});
+        sizes = appendAndPickForSizeRatio(compaction, sizes);
+        assertThat(sizes).isEqualTo(new long[] {3, 4, 5});
+        sizes = appendAndPickForSizeRatio(compaction, sizes);
+        assertThat(sizes).isEqualTo(new long[] {1, 3, 4, 5});
+        sizes = appendAndPickForSizeRatio(compaction, sizes);
+        assertThat(sizes).isEqualTo(new long[] {2, 3, 4, 5});
+        sizes = appendAndPickForSizeRatio(compaction, sizes);
+        assertThat(sizes).isEqualTo(new long[] {1, 2, 3, 4, 5});
+        sizes = appendAndPickForSizeRatio(compaction, sizes);
+        assertThat(sizes).isEqualTo(new long[] {16});
+        sizes = appendAndPickForSizeRatio(compaction, sizes);
+        assertThat(sizes).isEqualTo(new long[] {1, 16});
+        sizes = appendAndPickForSizeRatio(compaction, sizes);
+        assertThat(sizes).isEqualTo(new long[] {1, 1, 16});
+        sizes = appendAndPickForSizeRatio(compaction, sizes);
+        assertThat(sizes).isEqualTo(new long[] {1, 1, 1, 16});
+        sizes = appendAndPickForSizeRatio(compaction, sizes);
+        assertThat(sizes).isEqualTo(new long[] {4, 16});
+        sizes = appendAndPickForSizeRatio(compaction, sizes);
+        assertThat(sizes).isEqualTo(new long[] {1, 4, 16});
+        sizes = appendAndPickForSizeRatio(compaction, sizes);
+        assertThat(sizes).isEqualTo(new long[] {1, 1, 4, 16});
+        sizes = appendAndPickForSizeRatio(compaction, sizes);
+        assertThat(sizes).isEqualTo(new long[] {3, 4, 16});
+        sizes = appendAndPickForSizeRatio(compaction, sizes);
+        assertThat(sizes).isEqualTo(new long[] {1, 3, 4, 16});
+        sizes = appendAndPickForSizeRatio(compaction, sizes);
+        assertThat(sizes).isEqualTo(new long[] {2, 3, 4, 16});
+        sizes = appendAndPickForSizeRatio(compaction, sizes);
+        assertThat(sizes).isEqualTo(new long[] {1, 2, 3, 4, 16});
+        sizes = appendAndPickForSizeRatio(compaction, sizes);
+        assertThat(sizes).isEqualTo(new long[] {11, 16});
+    }
+
+    @Test
+    public void testSizeRatioThreshold() {
+        long[] sizes = new long[] {8, 9, 10};
+        assertThat(pickForSizeRatio(new UniversalCompaction(25, 10, 2), sizes))
+                .isEqualTo(new long[] {8, 9, 10});
+        assertThat(pickForSizeRatio(new UniversalCompaction(25, 20, 2), sizes))
+                .isEqualTo(new long[] {27});
+    }
+
+    private List<LevelSortedRun> createLevels(int... levels) {
+        List<LevelSortedRun> runs = new ArrayList<>();
+        for (int size : levels) {
+            runs.add(new LevelSortedRun(size, SortedRun.fromSingle(file(1))));
+        }
+        return runs;
+    }
+
+    private long[] appendAndPickForSizeAmp(UniversalCompaction compaction, 
long... sizes) {
+        sizes = addSize(sizes);
+        CompactUnit unit = compaction.pickForSizeAmp(3, level0(sizes));
+        if (unit != null) {
+            return new long[] {
+                
unit.files().stream().mapToLong(SstFileMeta::fileSize).reduce(Long::sum).getAsLong()
+            };
+        }
+        return sizes;
+    }
+
+    private long[] appendAndPickForSizeRatio(UniversalCompaction compaction, 
long... sizes) {
+        return pickForSizeRatio(compaction, addSize(sizes));
+    }
+
+    private long[] pickForSizeRatio(UniversalCompaction compaction, long... 
sizes) {
+        CompactUnit unit = compaction.pickForSizeRatio(3, level0(sizes));
+        if (unit != null) {
+            List<Long> compact =
+                    
unit.files().stream().map(SstFileMeta::fileSize).collect(Collectors.toList());
+            List<Long> result = new ArrayList<>();
+            for (long size : sizes) {
+                result.add(size);
+            }
+            compact.forEach(result::remove);
+            result.add(0, compact.stream().reduce(Long::sum).get());
+            return result.stream().mapToLong(Long::longValue).toArray();
+        }
+        return sizes;
+    }
+
+    private long[] addSize(long... sizes) {
+        long[] newSizes = new long[sizes.length + 1];
+        newSizes[0] = 1;
+        System.arraycopy(sizes, 0, newSizes, 1, sizes.length);
+        return newSizes;
+    }
+
+    private List<LevelSortedRun> level0(long... sizes) {
+        List<LevelSortedRun> runs = new ArrayList<>();
+        for (Long size : sizes) {
+            runs.add(new LevelSortedRun(0, SortedRun.fromSingle(file(size))));
+        }
+        return runs;
+    }
+
+    private SstFileMeta file(long size) {
+        return new SstFileMeta("", size, 1, null, null, null, 0, 0, 0);
+    }
+}

Reply via email to