This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-0.2 in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/release-0.2 by this push: new 95bb7a54 [FLINK-28482] num-sorted-run.stop-trigger introduced a unstable merging 95bb7a54 is described below commit 95bb7a54d62e6b02492d33381cfb9d73ee2665fe Author: Nicholas Jiang <programg...@163.com> AuthorDate: Wed Jul 20 18:19:35 2022 +0800 [FLINK-28482] num-sorted-run.stop-trigger introduced a unstable merging This closes #216 --- .../shortcodes/generated/core_configuration.html | 8 +++- .../org/apache/flink/table/store/CoreOptions.java | 17 +++++++- .../mergetree/compact/UniversalCompaction.java | 13 ++++-- .../file/operation/KeyValueFileStoreWrite.java | 3 +- .../table/store/file/mergetree/MergeTreeTest.java | 3 +- .../mergetree/compact/UniversalCompactionTest.java | 49 +++++++++++++++++----- 6 files changed, 75 insertions(+), 18 deletions(-) diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index 09cdf87e..c985f08b 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -44,6 +44,12 @@ <td>Integer</td> <td>The size amplification is defined as the amount (in percentage) of additional storage needed to store a single byte of data in the merge tree for changelog mode table.</td> </tr> + <tr> + <td><h5>compaction.max-sorted-run-num</h5></td> + <td style="word-wrap: break-word;">2147483647</td> + <td>Integer</td> + <td>The maximum sorted run number to pick for compaction. This value avoids merging too much sorted runs at the same time during compaction, which may lead to OutOfMemoryError.</td> + </tr> <tr> <td><h5>compaction.min.file-num</h5></td> <td style="word-wrap: break-word;">5</td> @@ -150,7 +156,7 @@ <td><h5>num-sorted-run.stop-trigger</h5></td> <td style="word-wrap: break-word;">10</td> <td>Integer</td> - <td>The number of sorted-runs that trigger the stopping of writes.</td> + <td>The number of sorted runs that trigger the stopping of writes.</td> </tr> <tr> <td><h5>page-size</h5></td> diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java index 3cc3ff27..8a7a5362 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java @@ -193,7 +193,7 @@ public class CoreOptions implements Serializable { .intType() .defaultValue(10) .withDescription( - "The number of sorted-runs that trigger the stopping of writes."); + "The number of sorted runs that trigger the stopping of writes."); public static final ConfigOption<Integer> NUM_LEVELS = ConfigOptions.key("num-levels") @@ -244,6 +244,15 @@ public class CoreOptions implements Serializable { + "for append-only table, even if sum(size(f_i)) < targetFileSize. This value " + "avoids pending too much small files, which slows down the performance."); + public static final ConfigOption<Integer> COMPACTION_MAX_SORTED_RUN_NUM = + ConfigOptions.key("compaction.max-sorted-run-num") + .intType() + .defaultValue(Integer.MAX_VALUE) + .withDescription( + "The maximum sorted run number to pick for compaction. " + + "This value avoids merging too much sorted runs at the same time during compaction, " + + "which may lead to OutOfMemoryError."); + public static final ConfigOption<Boolean> CHANGELOG_FILE = ConfigOptions.key("changelog-file") .booleanType() @@ -405,7 +414,7 @@ public class CoreOptions implements Serializable { // By default, this ensures that the compaction does not fall to level 0, but at least to // level 1 Integer numLevels = options.get(NUM_LEVELS); - numLevels = numLevels == null ? numSortedRunCompactionTrigger() + 1 : numLevels; + numLevels = numLevels == null ? numSortedRunStopTrigger() + 1 : numLevels; return numLevels; } @@ -429,6 +438,10 @@ public class CoreOptions implements Serializable { return options.get(COMPACTION_MAX_FILE_NUM); } + public int maxSortedRunNum() { + return options.get(COMPACTION_MAX_SORTED_RUN_NUM); + } + public boolean enableChangelogFile() { return options.get(CHANGELOG_FILE); } diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompaction.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompaction.java index f4217431..ce5eac10 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompaction.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompaction.java @@ -43,11 +43,14 @@ public class UniversalCompaction implements CompactStrategy { private final int maxSizeAmp; private final int sizeRatio; private final int numRunCompactionTrigger; + private final int maxSortedRunNum; - public UniversalCompaction(int maxSizeAmp, int sizeRatio, int numRunCompactionTrigger) { + public UniversalCompaction( + int maxSizeAmp, int sizeRatio, int numRunCompactionTrigger, int maxSortedRunNum) { this.maxSizeAmp = maxSizeAmp; this.sizeRatio = sizeRatio; this.numRunCompactionTrigger = numRunCompactionTrigger; + this.maxSortedRunNum = maxSortedRunNum; } @Override @@ -130,7 +133,7 @@ public class UniversalCompaction implements CompactStrategy { } if (candidateCount > 1) { - return createUnit(runs, maxLevel, candidateCount); + return createUnit(runs, maxLevel, candidateCount, maxSortedRunNum); } return null; @@ -145,8 +148,12 @@ public class UniversalCompaction implements CompactStrategy { } @VisibleForTesting - static CompactUnit createUnit(List<LevelSortedRun> runs, int maxLevel, int runCount) { + static CompactUnit createUnit( + List<LevelSortedRun> runs, int maxLevel, int runCount, int maxSortedRunNum) { int outputLevel; + if (runCount > maxSortedRunNum) { + runCount = maxSortedRunNum; + } if (runCount == runs.size()) { outputLevel = maxLevel; } else { diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java index 574c7eb0..e44f435a 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java @@ -140,7 +140,8 @@ public class KeyValueFileStoreWrite extends AbstractFileStoreWrite<KeyValue> { new UniversalCompaction( options.maxSizeAmplificationPercent(), options.sortedRunSizeRatio(), - options.numSortedRunCompactionTrigger()), + options.numSortedRunCompactionTrigger(), + options.maxSortedRunNum()), compactExecutor, levels), levels, diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java index 8a5793d4..f1067545 100644 --- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java +++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java @@ -292,7 +292,8 @@ public class MergeTreeTest { new UniversalCompaction( options.maxSizeAmplificationPercent(), options.sortedRunSizeRatio(), - options.numSortedRunCompactionTrigger()); + options.numSortedRunCompactionTrigger(), + options.maxSortedRunNum()); CompactRewriter rewriter = (outputLevel, dropDelete, sections) -> dataFileWriter.write( diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompactionTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompactionTest.java index a42ede9c..6a8b3793 100644 --- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompactionTest.java +++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompactionTest.java @@ -38,16 +38,21 @@ public class UniversalCompactionTest { @Test public void testOutputLevel() { - assertThat(createUnit(createLevels(0, 0, 1, 3, 4), 5, 1).outputLevel()).isEqualTo(0); - assertThat(createUnit(createLevels(0, 0, 1, 3, 4), 5, 2).outputLevel()).isEqualTo(0); - assertThat(createUnit(createLevels(0, 0, 1, 3, 4), 5, 3).outputLevel()).isEqualTo(2); - assertThat(createUnit(createLevels(0, 0, 1, 3, 4), 5, 4).outputLevel()).isEqualTo(3); - assertThat(createUnit(createLevels(0, 0, 1, 3, 4), 5, 5).outputLevel()).isEqualTo(5); + assertThat(createUnit(createLevels(0, 0, 1, 3, 4), 5, 1, Integer.MAX_VALUE).outputLevel()) + .isEqualTo(0); + assertThat(createUnit(createLevels(0, 0, 1, 3, 4), 5, 2, Integer.MAX_VALUE).outputLevel()) + .isEqualTo(0); + assertThat(createUnit(createLevels(0, 0, 1, 3, 4), 5, 3, Integer.MAX_VALUE).outputLevel()) + .isEqualTo(2); + assertThat(createUnit(createLevels(0, 0, 1, 3, 4), 5, 4, Integer.MAX_VALUE).outputLevel()) + .isEqualTo(3); + assertThat(createUnit(createLevels(0, 0, 1, 3, 4), 5, 5, Integer.MAX_VALUE).outputLevel()) + .isEqualTo(5); } @Test public void testPick() { - UniversalCompaction compaction = new UniversalCompaction(25, 1, 3); + UniversalCompaction compaction = new UniversalCompaction(25, 1, 3, Integer.MAX_VALUE); // by size amplification Optional<CompactUnit> pick = compaction.pick(3, level0(1, 2, 3, 3)); @@ -69,9 +74,33 @@ public class UniversalCompactionTest { assertThat(results).isEqualTo(new long[] {1, 2, 3}); } + @Test + public void testPickWithMaxSortedRunNum() { + UniversalCompaction compaction = new UniversalCompaction(25, 1, 3, 2); + + // by size amplification + Optional<CompactUnit> pick = compaction.pick(3, level0(1, 2, 3, 3)); + assertThat(pick.isPresent()).isTrue(); + long[] results = pick.get().files().stream().mapToLong(DataFileMeta::fileSize).toArray(); + assertThat(results).isEqualTo(new long[] {1, 2, 3, 3}); + + // by size ratio + pick = compaction.pick(3, level0(1, 1, 1, 50)); + assertThat(pick.isPresent()).isTrue(); + results = pick.get().files().stream().mapToLong(DataFileMeta::fileSize).toArray(); + assertThat(results).isEqualTo(new long[] {1, 1}); + + // by file num + pick = compaction.pick(3, level0(1, 2, 3, 50)); + assertThat(pick.isPresent()).isTrue(); + results = pick.get().files().stream().mapToLong(DataFileMeta::fileSize).toArray(); + // 3 should be in the candidate, by size ratio after picking by file num + assertThat(results).isEqualTo(new long[] {1, 2}); + } + @Test public void testSizeAmplification() { - UniversalCompaction compaction = new UniversalCompaction(25, 0, 1); + UniversalCompaction compaction = new UniversalCompaction(25, 0, 1, Integer.MAX_VALUE); long[] sizes = new long[] {1}; sizes = appendAndPickForSizeAmp(compaction, sizes); assertThat(sizes).isEqualTo(new long[] {2}); @@ -111,7 +140,7 @@ public class UniversalCompactionTest { @Test public void testSizeRatio() { - UniversalCompaction compaction = new UniversalCompaction(25, 1, 5); + UniversalCompaction compaction = new UniversalCompaction(25, 1, 5, Integer.MAX_VALUE); long[] sizes = new long[] {1, 1, 1, 1}; sizes = appendAndPickForSizeRatio(compaction, sizes); assertThat(sizes).isEqualTo(new long[] {5}); @@ -164,9 +193,9 @@ public class UniversalCompactionTest { @Test public void testSizeRatioThreshold() { long[] sizes = new long[] {8, 9, 10}; - assertThat(pickForSizeRatio(new UniversalCompaction(25, 10, 2), sizes)) + assertThat(pickForSizeRatio(new UniversalCompaction(25, 10, 2, Integer.MAX_VALUE), sizes)) .isEqualTo(new long[] {8, 9, 10}); - assertThat(pickForSizeRatio(new UniversalCompaction(25, 20, 2), sizes)) + assertThat(pickForSizeRatio(new UniversalCompaction(25, 20, 2, Integer.MAX_VALUE), sizes)) .isEqualTo(new long[] {27}); }