This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push: new 9f760307 [FLINK-27708] Add background compaction task for append-only table when ingesting 9f760307 is described below commit 9f760307f9da361ddb4ee786a9f3ab1651db149c Author: Jane Chan <55568005+ladyfor...@users.noreply.github.com> AuthorDate: Mon Jul 11 11:53:55 2022 +0800 [FLINK-27708] Add background compaction task for append-only table when ingesting This closes #182 --- .../shortcodes/generated/core_configuration.html | 18 +- ...lyTableTest.java => AppendOnlyTableITCase.java} | 90 +++++++-- .../connector/sink/CommittableSerializerTest.java | 2 +- .../sink/FileCommittableSerializerTest.java | 2 +- .../source/FileStoreSourceReaderTest.java | 2 +- .../source/FileStoreSourceSplitGeneratorTest.java | 2 +- .../source/FileStoreSourceSplitReaderTest.java | 2 +- .../source/FileStoreSourceSplitSerializerTest.java | 2 +- .../source/FileStoreSourceSplitStateTest.java | 2 +- .../PendingSplitsCheckpointSerializerTest.java | 2 +- .../source/StaticFileStoreSplitEnumeratorTest.java | 2 +- .../source/TestChangelogDataReadWrite.java | 5 +- .../org/apache/flink/table/store/CoreOptions.java | 33 +++- .../table/store/file/AppendOnlyFileStore.java | 6 +- .../flink/table/store/file/KeyValueFileStore.java | 1 - .../table/store/file/compact/CompactManager.java | 55 ++++++ .../{mergetree => }/compact/CompactResult.java | 2 +- .../table/store/file/compact/CompactTask.java | 75 +++++++ .../file/{mergetree => }/compact/CompactUnit.java | 2 +- .../store/file/data/AppendOnlyCompactManager.java | 131 +++++++++++++ .../table/store/file/data/AppendOnlyWriter.java | 188 ++++++++++++++---- .../store/file/mergetree/MergeTreeWriter.java | 8 +- .../file/mergetree/compact/CompactStrategy.java | 1 + ...ctManager.java => MergeTreeCompactManager.java} | 50 ++--- ...{CompactTask.java => MergeTreeCompactTask.java} | 116 ++++------- .../mergetree/compact/UniversalCompaction.java | 1 + .../file/operation/AbstractFileStoreScan.java | 12 +- .../file/operation/AppendOnlyFileStoreWrite.java | 69 ++++++- .../table/store/file/operation/FileStoreWrite.java | 2 +- .../file/operation/KeyValueFileStoreWrite.java | 37 ++-- .../flink/table/store/table/sink/TableCompact.java | 2 +- .../file/data/AppendOnlyCompactManagerTest.java | 216 +++++++++++++++++++++ .../store/file/data/AppendOnlyWriterTest.java | 158 ++++++++++++--- .../table/store/file/data/DataFileTestUtils.java | 15 ++ .../store/file/format/FileFormatSuffixTest.java | 13 +- .../ManifestCommittableSerializerTest.java | 2 +- .../table/store/file/mergetree/LevelsTest.java | 2 +- .../table/store/file/mergetree/MergeTreeTest.java | 25 ++- ...rTest.java => MergeTreeCompactManagerTest.java} | 15 +- .../mergetree/compact/UniversalCompactionTest.java | 1 + .../flink/table/store/kafka/KafkaLogTestUtils.java | 2 +- 41 files changed, 1131 insertions(+), 240 deletions(-) diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index 304e7585..09cdf87e 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -32,17 +32,29 @@ <td>Boolean</td> <td>Whether to force a compaction before commit.</td> </tr> + <tr> + <td><h5>compaction.early-max.file-num</h5></td> + <td style="word-wrap: break-word;">50</td> + <td>Integer</td> + <td>For file set [f_0,...,f_N], the maximum file number to trigger a compaction for append-only table, even if sum(size(f_i)) < targetFileSize. This value avoids pending too much small files, which slows down the performance.</td> + </tr> <tr> <td><h5>compaction.max-size-amplification-percent</h5></td> <td style="word-wrap: break-word;">200</td> <td>Integer</td> - <td>The size amplification is defined as the amount (in percentage) of additional storage needed to store a single byte of data in the merge tree.</td> + <td>The size amplification is defined as the amount (in percentage) of additional storage needed to store a single byte of data in the merge tree for changelog mode table.</td> + </tr> + <tr> + <td><h5>compaction.min.file-num</h5></td> + <td style="word-wrap: break-word;">5</td> + <td>Integer</td> + <td>For file set [f_0,...,f_N], the minimum file number which satisfies sum(size(f_i)) >= targetFileSize to trigger a compaction for append-only table. This value avoids almost-full-file to be compacted, which is not cost-effective.</td> </tr> <tr> <td><h5>compaction.size-ratio</h5></td> <td style="word-wrap: break-word;">1</td> <td>Integer</td> - <td>Percentage flexibility while comparing sorted run size. If the candidate sorted run(s) size is 1% smaller than the next sorted run's size, then include next sorted run into this candidate set.</td> + <td>Percentage flexibility while comparing sorted run size for changelog mode table. If the candidate sorted run(s) size is 1% smaller than the next sorted run's size, then include next sorted run into this candidate set.</td> </tr> <tr> <td><h5>continuous.discovery-interval</h5></td> @@ -196,7 +208,7 @@ </tr> <tr> <td><h5>write-buffer-size</h5></td> - <td style="word-wrap: break-word;">256 mb</td> + <td style="word-wrap: break-word;">128 mb</td> <td>MemorySize</td> <td>Amount of data to build up in memory before converting to a sorted on-disk file.</td> </tr> diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AppendOnlyTableTest.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AppendOnlyTableITCase.java similarity index 67% rename from flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AppendOnlyTableTest.java rename to flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AppendOnlyTableITCase.java index 682c623c..70f06993 100644 --- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AppendOnlyTableTest.java +++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AppendOnlyTableITCase.java @@ -20,6 +20,7 @@ package org.apache.flink.table.store.connector; import org.apache.flink.table.api.TableException; import org.apache.flink.table.planner.factories.TestValuesTableFactory; +import org.apache.flink.table.store.file.Snapshot; import org.apache.flink.types.Row; import org.apache.flink.types.RowKind; @@ -33,7 +34,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Test case for append-only managed table. */ -public class AppendOnlyTableTest extends FileStoreTableITCase { +public class AppendOnlyTableITCase extends FileStoreTableITCase { @Test public void testCreateTableWithPrimaryKey() { @@ -124,17 +125,61 @@ public class AppendOnlyTableTest extends FileStoreTableITCase { Row.of("AAA"), Row.of("AAA"), Row.of("BBB"), Row.of("AAA")); } - private void testRejectChanges(RowKind kind) { - List<Row> input = Collections.singletonList(Row.ofKind(kind, 1, "AAA")); - - String id = TestValuesTableFactory.registerData(input); - batchSql( - "CREATE TABLE source (id INT, data STRING) WITH ('connector'='values', 'bounded'='true', 'data-id'='%s')", - id); + @Test + public void testAutoCompaction() { + batchSql("ALTER TABLE append_table SET ('commit.force-compact' = 'true')"); + batchSql("ALTER TABLE append_table SET ('compaction.min.file-num' = '2')"); + batchSql("ALTER TABLE append_table SET ('compaction.early-max.file-num' = '4')"); + + assertAutoCompaction( + "INSERT INTO append_table VALUES (1, 'AAA'), (2, 'BBB')", + 1L, + Snapshot.CommitKind.APPEND); + assertAutoCompaction( + "INSERT INTO append_table VALUES (3, 'CCC'), (4, 'DDD')", + 2L, + Snapshot.CommitKind.APPEND); + assertAutoCompaction( + "INSERT INTO append_table VALUES (1, 'AAA'), (2, 'BBB'), (3, 'CCC'), (4, 'DDD')", + 3L, + Snapshot.CommitKind.APPEND); + assertAutoCompaction( + "INSERT INTO append_table VALUES (5, 'EEE'), (6, 'FFF')", + 5L, + Snapshot.CommitKind.COMPACT); + assertAutoCompaction( + "INSERT INTO append_table VALUES (7, 'HHH'), (8, 'III')", + 6L, + Snapshot.CommitKind.APPEND); + assertAutoCompaction( + "INSERT INTO append_table VALUES (9, 'JJJ'), (10, 'KKK')", + 7L, + Snapshot.CommitKind.APPEND); + assertAutoCompaction( + "INSERT INTO append_table VALUES (11, 'LLL'), (12, 'MMM')", + 9L, + Snapshot.CommitKind.COMPACT); - assertThatThrownBy(() -> batchSql("INSERT INTO append_table SELECT * FROM source")) - .hasRootCauseInstanceOf(IllegalStateException.class) - .hasRootCauseMessage("Append only writer can not accept row with RowKind %s", kind); + List<Row> rows = batchSql("SELECT * FROM append_table"); + assertThat(rows.size()).isEqualTo(16); + assertThat(rows) + .containsExactlyInAnyOrder( + Row.of(1, "AAA"), + Row.of(2, "BBB"), + Row.of(3, "CCC"), + Row.of(4, "DDD"), + Row.of(1, "AAA"), + Row.of(2, "BBB"), + Row.of(3, "CCC"), + Row.of(4, "DDD"), + Row.of(5, "EEE"), + Row.of(6, "FFF"), + Row.of(7, "HHH"), + Row.of(8, "III"), + Row.of(9, "JJJ"), + Row.of(10, "KKK"), + Row.of(11, "LLL"), + Row.of(12, "MMM")); } @Test @@ -155,6 +200,27 @@ public class AppendOnlyTableTest extends FileStoreTableITCase { @Override protected List<String> ddl() { return Collections.singletonList( - "CREATE TABLE IF NOT EXISTS append_table (id INT, data STRING) WITH ('write-mode'='append-only')"); + "CREATE TABLE IF NOT EXISTS append_table (id INT, data STRING) WITH ('write-mode'='append-only', 'commit.force-compact' = 'true')"); + } + + private void testRejectChanges(RowKind kind) { + List<Row> input = Collections.singletonList(Row.ofKind(kind, 1, "AAA")); + + String id = TestValuesTableFactory.registerData(input); + batchSql( + "CREATE TABLE source (id INT, data STRING) WITH ('connector'='values', 'bounded'='true', 'data-id'='%s')", + id); + + assertThatThrownBy(() -> batchSql("INSERT INTO append_table SELECT * FROM source")) + .hasRootCauseInstanceOf(IllegalStateException.class) + .hasRootCauseMessage("Append only writer can not accept row with RowKind %s", kind); + } + + private void assertAutoCompaction( + String sql, long expectedSnapshotId, Snapshot.CommitKind expectedCommitKind) { + batchSql(sql); + Snapshot snapshot = findLatestSnapshot("append_table", true); + assertThat(snapshot.id()).isEqualTo(expectedSnapshotId); + assertThat(snapshot.commitKind()).isEqualTo(expectedCommitKind); } } diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/CommittableSerializerTest.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/CommittableSerializerTest.java index 03b5e614..7752421c 100644 --- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/CommittableSerializerTest.java +++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/CommittableSerializerTest.java @@ -26,7 +26,7 @@ import org.junit.jupiter.api.Test; import java.io.IOException; import static org.apache.flink.table.store.file.manifest.ManifestCommittableSerializerTest.randomIncrement; -import static org.apache.flink.table.store.file.mergetree.compact.CompactManagerTest.row; +import static org.apache.flink.table.store.file.mergetree.compact.MergeTreeCompactManagerTest.row; import static org.assertj.core.api.Assertions.assertThat; /** Test for {@link CommittableSerializer}. */ diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/FileCommittableSerializerTest.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/FileCommittableSerializerTest.java index 73b66f03..0b010cf4 100644 --- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/FileCommittableSerializerTest.java +++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/FileCommittableSerializerTest.java @@ -26,7 +26,7 @@ import org.junit.jupiter.api.Test; import java.io.IOException; import static org.apache.flink.table.store.file.manifest.ManifestCommittableSerializerTest.randomIncrement; -import static org.apache.flink.table.store.file.mergetree.compact.CompactManagerTest.row; +import static org.apache.flink.table.store.file.mergetree.compact.MergeTreeCompactManagerTest.row; import static org.assertj.core.api.Assertions.assertThat; /** Test for {@link FileCommittableSerializer}. */ diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceReaderTest.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceReaderTest.java index 626bdc38..6025b735 100644 --- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceReaderTest.java +++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceReaderTest.java @@ -26,7 +26,7 @@ import org.junit.jupiter.api.io.TempDir; import java.util.Collections; import static org.apache.flink.table.store.connector.source.FileStoreSourceSplitSerializerTest.newSourceSplit; -import static org.apache.flink.table.store.file.mergetree.compact.CompactManagerTest.row; +import static org.apache.flink.table.store.file.mergetree.compact.MergeTreeCompactManagerTest.row; import static org.assertj.core.api.Assertions.assertThat; /** Unit tests for the {@link FileStoreSourceReader}. */ diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGeneratorTest.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGeneratorTest.java index d25e9119..aece5b87 100644 --- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGeneratorTest.java +++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGeneratorTest.java @@ -37,7 +37,7 @@ import java.util.Comparator; import java.util.List; import java.util.stream.Collectors; -import static org.apache.flink.table.store.file.mergetree.compact.CompactManagerTest.row; +import static org.apache.flink.table.store.file.mergetree.compact.MergeTreeCompactManagerTest.row; import static org.assertj.core.api.Assertions.assertThat; /** Test for {@link FileStoreSourceSplitGenerator}. */ diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReaderTest.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReaderTest.java index 258f8da6..214fbca8 100644 --- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReaderTest.java +++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReaderTest.java @@ -45,7 +45,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import static org.apache.flink.table.store.connector.source.FileStoreSourceSplitSerializerTest.newSourceSplit; -import static org.apache.flink.table.store.file.mergetree.compact.CompactManagerTest.row; +import static org.apache.flink.table.store.file.mergetree.compact.MergeTreeCompactManagerTest.row; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitSerializerTest.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitSerializerTest.java index 90c595e4..863a2047 100644 --- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitSerializerTest.java +++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitSerializerTest.java @@ -30,7 +30,7 @@ import java.io.IOException; import java.util.Arrays; import java.util.List; -import static org.apache.flink.table.store.file.mergetree.compact.CompactManagerTest.row; +import static org.apache.flink.table.store.file.mergetree.compact.MergeTreeCompactManagerTest.row; import static org.assertj.core.api.Assertions.assertThat; /** Unit tests for the {@link FileStoreSourceSplitSerializer}. */ diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitStateTest.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitStateTest.java index e6c72d63..539486b2 100644 --- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitStateTest.java +++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitStateTest.java @@ -26,7 +26,7 @@ import java.util.Arrays; import static org.apache.flink.table.store.connector.source.FileStoreSourceSplitSerializerTest.newFile; import static org.apache.flink.table.store.connector.source.FileStoreSourceSplitSerializerTest.newSourceSplit; -import static org.apache.flink.table.store.file.mergetree.compact.CompactManagerTest.row; +import static org.apache.flink.table.store.file.mergetree.compact.MergeTreeCompactManagerTest.row; import static org.assertj.core.api.Assertions.assertThat; /** Unit tests for the {@link FileStoreSourceSplitState}. */ diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/PendingSplitsCheckpointSerializerTest.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/PendingSplitsCheckpointSerializerTest.java index a3ab6a2f..0a5a3862 100644 --- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/PendingSplitsCheckpointSerializerTest.java +++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/PendingSplitsCheckpointSerializerTest.java @@ -28,7 +28,7 @@ import java.util.Collections; import static org.apache.flink.table.store.connector.source.FileStoreSourceSplitSerializerTest.newFile; import static org.apache.flink.table.store.connector.source.FileStoreSourceSplitSerializerTest.newSourceSplit; -import static org.apache.flink.table.store.file.mergetree.compact.CompactManagerTest.row; +import static org.apache.flink.table.store.file.mergetree.compact.MergeTreeCompactManagerTest.row; import static org.assertj.core.api.Assertions.assertThat; /** Unit tests for the {@link PendingSplitsCheckpointSerializer}. */ diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/StaticFileStoreSplitEnumeratorTest.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/StaticFileStoreSplitEnumeratorTest.java index 6a60b140..8b28c155 100644 --- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/StaticFileStoreSplitEnumeratorTest.java +++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/StaticFileStoreSplitEnumeratorTest.java @@ -27,7 +27,7 @@ import java.util.Arrays; import static org.apache.flink.table.store.connector.source.FileStoreSourceSplitSerializerTest.newFile; import static org.apache.flink.table.store.connector.source.FileStoreSourceSplitSerializerTest.newSourceSplit; -import static org.apache.flink.table.store.file.mergetree.compact.CompactManagerTest.row; +import static org.apache.flink.table.store.file.mergetree.compact.MergeTreeCompactManagerTest.row; import static org.assertj.core.api.Assertions.assertThat; /** Unit tests for the {@link StaticFileStoreSplitEnumerator}. */ diff --git a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java index 16134667..94a02856 100644 --- a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java +++ b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java @@ -49,6 +49,7 @@ import org.apache.flink.types.RowKind; import org.apache.flink.util.Preconditions; import java.util.ArrayList; +import java.util.Collections; import java.util.Comparator; import java.util.List; import java.util.concurrent.ExecutorService; @@ -139,7 +140,8 @@ public class TestChangelogDataReadWrite { } public RecordWriter<KeyValue> createMergeTreeWriter(BinaryRowData partition, int bucket) { - CoreOptions options = new CoreOptions(new Configuration()); + CoreOptions options = + new CoreOptions(Collections.singletonMap(CoreOptions.FILE_FORMAT.key(), "avro")); RecordWriter<KeyValue> writer = new KeyValueFileStoreWrite( new SchemaManager(tablePath), @@ -148,7 +150,6 @@ public class TestChangelogDataReadWrite { VALUE_TYPE, () -> COMPARATOR, new DeduplicateMergeFunction(), - avro, pathFactory, snapshotManager, null, // not used, we only create an empty writer diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java index a45b5b6f..3cc3ff27 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java @@ -214,17 +214,36 @@ public class CoreOptions implements Serializable { .defaultValue(200) .withDescription( "The size amplification is defined as the amount (in percentage) of additional storage " - + "needed to store a single byte of data in the merge tree."); + + "needed to store a single byte of data in the merge tree for changelog mode table."); public static final ConfigOption<Integer> COMPACTION_SIZE_RATIO = ConfigOptions.key("compaction.size-ratio") .intType() .defaultValue(1) .withDescription( - "Percentage flexibility while comparing sorted run size. If the candidate sorted run(s) " + "Percentage flexibility while comparing sorted run size for changelog mode table. If the candidate sorted run(s) " + "size is 1% smaller than the next sorted run's size, then include next sorted run " + "into this candidate set."); + public static final ConfigOption<Integer> COMPACTION_MIN_FILE_NUM = + ConfigOptions.key("compaction.min.file-num") + .intType() + .defaultValue(5) + .withDescription( + "For file set [f_0,...,f_N], the minimum file number which satisfies " + + "sum(size(f_i)) >= targetFileSize to trigger a compaction for " + + "append-only table. This value avoids almost-full-file to be compacted, " + + "which is not cost-effective."); + + public static final ConfigOption<Integer> COMPACTION_MAX_FILE_NUM = + ConfigOptions.key("compaction.early-max.file-num") + .intType() + .defaultValue(50) + .withDescription( + "For file set [f_0,...,f_N], the maximum file number to trigger a compaction " + + "for append-only table, even if sum(size(f_i)) < targetFileSize. This value " + + "avoids pending too much small files, which slows down the performance."); + public static final ConfigOption<Boolean> CHANGELOG_FILE = ConfigOptions.key("changelog-file") .booleanType() @@ -398,10 +417,18 @@ public class CoreOptions implements Serializable { return options.get(COMPACTION_MAX_SIZE_AMPLIFICATION_PERCENT); } - public int sizeRatio() { + public int sortedRunSizeRatio() { return options.get(COMPACTION_SIZE_RATIO); } + public int minFileNum() { + return options.get(COMPACTION_MIN_FILE_NUM); + } + + public int maxFileNum() { + return options.get(COMPACTION_MAX_FILE_NUM); + } + public boolean enableChangelogFile() { return options.get(CHANGELOG_FILE); } diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AppendOnlyFileStore.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AppendOnlyFileStore.java index 8a803a74..12ca83cd 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AppendOnlyFileStore.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AppendOnlyFileStore.java @@ -58,13 +58,17 @@ public class AppendOnlyFileStore extends AbstractFileStore<RowData> { @Override public AppendOnlyFileStoreWrite newWrite() { return new AppendOnlyFileStoreWrite( + newRead(), schemaId, rowType, options.fileFormat(), pathFactory(), snapshotManager(), newScan(true), - options.targetFileSize()); + options.targetFileSize(), + options.minFileNum(), + options.maxFileNum(), + options.commitForceCompact()); } private AppendOnlyFileStoreScan newScan(boolean checkNumOfBuckets) { diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValueFileStore.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValueFileStore.java index 9c7ae86e..77417e99 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValueFileStore.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValueFileStore.java @@ -86,7 +86,6 @@ public class KeyValueFileStore extends AbstractFileStore<KeyValue> { valueType, keyComparatorSupplier, mergeFunction, - options.fileFormat(), pathFactory(), snapshotManager(), newScan(true), diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/CompactManager.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/CompactManager.java new file mode 100644 index 00000000..2063e17b --- /dev/null +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/CompactManager.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.file.compact; + +import java.util.Optional; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; + +/** Manager to submit compaction task. */ +public abstract class CompactManager { + + protected final ExecutorService executor; + + protected Future<CompactResult> taskFuture; + + public CompactManager(ExecutorService executor) { + this.executor = executor; + } + + /** Submit a new compaction task. */ + public abstract void submitCompaction(); + + public boolean isCompactionFinished() { + return taskFuture == null; + } + + public Optional<CompactResult> finishCompaction(boolean blocking) + throws ExecutionException, InterruptedException { + if (taskFuture != null) { + if (blocking || taskFuture.isDone()) { + CompactResult result = taskFuture.get(); + taskFuture = null; + return Optional.of(result); + } + } + return Optional.empty(); + } +} diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactResult.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/CompactResult.java similarity index 94% rename from flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactResult.java rename to flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/CompactResult.java index e5b6e826..865ac251 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactResult.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/CompactResult.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.table.store.file.mergetree.compact; +package org.apache.flink.table.store.file.compact; import org.apache.flink.table.store.file.data.DataFileMeta; diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/CompactTask.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/CompactTask.java new file mode 100644 index 00000000..a129c28a --- /dev/null +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/CompactTask.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.file.compact; + +import org.apache.flink.table.store.file.data.DataFileMeta; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.concurrent.Callable; + +/** Compact task. */ +public abstract class CompactTask implements Callable<CompactResult> { + + private static final Logger LOG = LoggerFactory.getLogger(CompactTask.class); + + private final List<DataFileMeta> inputs; + + public CompactTask(List<DataFileMeta> inputs) { + this.inputs = inputs; + } + + @Override + public CompactResult call() throws Exception { + long startMillis = System.currentTimeMillis(); + CompactResult result = doCompact(inputs); + + if (LOG.isDebugEnabled()) { + logMetric(startMillis, result.before(), result.after()); + } + + return result; + } + + protected String logMetric( + long startMillis, List<DataFileMeta> compactBefore, List<DataFileMeta> compactAfter) { + return String.format( + "Done compacting %d files to %d files in %dms. " + + "Rewrite input file size = %d, output file size = %d", + compactBefore.size(), + compactAfter.size(), + System.currentTimeMillis() - startMillis, + collectRewriteSize(compactBefore), + collectRewriteSize(compactAfter)); + } + + /** + * Perform compaction. + * + * @param inputs the candidate files to be compacted + * @return {@link CompactResult} of compact before and compact after files. + */ + protected abstract CompactResult doCompact(List<DataFileMeta> inputs) throws Exception; + + private long collectRewriteSize(List<DataFileMeta> files) { + return files.stream().mapToLong(DataFileMeta::fileSize).sum(); + } +} diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactUnit.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/CompactUnit.java similarity index 96% rename from flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactUnit.java rename to flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/CompactUnit.java index af8510e1..f42b89d2 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactUnit.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/CompactUnit.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.table.store.file.mergetree.compact; +package org.apache.flink.table.store.file.compact; import org.apache.flink.table.store.file.data.DataFileMeta; import org.apache.flink.table.store.file.mergetree.LevelSortedRun; diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyCompactManager.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyCompactManager.java new file mode 100644 index 00000000..e30b0e43 --- /dev/null +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyCompactManager.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.file.data; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.table.store.file.compact.CompactManager; +import org.apache.flink.table.store.file.compact.CompactResult; +import org.apache.flink.table.store.file.compact.CompactTask; + +import java.util.LinkedList; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.ExecutorService; + +/** Compact manager for {@link org.apache.flink.table.store.file.AppendOnlyFileStore}. */ +public class AppendOnlyCompactManager extends CompactManager { + + private final int minFileNum; + private final int maxFileNum; + private final long targetFileSize; + private final CompactRewriter rewriter; + private final LinkedList<DataFileMeta> toCompact; + + public AppendOnlyCompactManager( + ExecutorService executor, + LinkedList<DataFileMeta> toCompact, + int minFileNum, + int maxFileNum, + long targetFileSize, + CompactRewriter rewriter) { + super(executor); + this.toCompact = toCompact; + this.maxFileNum = maxFileNum; + this.minFileNum = minFileNum; + this.targetFileSize = targetFileSize; + this.rewriter = rewriter; + } + + @Override + public void submitCompaction() { + if (taskFuture != null) { + throw new IllegalStateException( + "Please finish the previous compaction before submitting new one."); + } + pickCompactBefore() + .ifPresent( + (inputs) -> + taskFuture = + executor.submit( + new AppendOnlyCompactTask(inputs, rewriter))); + } + + @VisibleForTesting + Optional<List<DataFileMeta>> pickCompactBefore() { + long totalFileSize = 0L; + int fileNum = 0; + LinkedList<DataFileMeta> candidates = new LinkedList<>(); + + while (!toCompact.isEmpty()) { + DataFileMeta file = toCompact.pollFirst(); + candidates.add(file); + totalFileSize += file.fileSize(); + fileNum++; + if ((totalFileSize >= targetFileSize && fileNum >= minFileNum) + || fileNum >= maxFileNum) { + return Optional.of(candidates); + } else if (totalFileSize >= targetFileSize) { + // left pointer shift one pos to right + DataFileMeta removed = candidates.pollFirst(); + assert removed != null; + totalFileSize -= removed.fileSize(); + fileNum--; + } + } + toCompact.addAll(candidates); + return Optional.empty(); + } + + @VisibleForTesting + LinkedList<DataFileMeta> getToCompact() { + return toCompact; + } + + /** A {@link CompactTask} impl for append-only table. */ + public static class AppendOnlyCompactTask extends CompactTask { + + private final CompactRewriter rewriter; + + public AppendOnlyCompactTask(List<DataFileMeta> toCompact, CompactRewriter rewriter) { + super(toCompact); + this.rewriter = rewriter; + } + + @Override + protected CompactResult doCompact(List<DataFileMeta> inputs) throws Exception { + List<DataFileMeta> compactAfter = rewriter.rewrite(inputs); + return new CompactResult() { + @Override + public List<DataFileMeta> before() { + return inputs; + } + + @Override + public List<DataFileMeta> after() { + return compactAfter; + } + }; + } + } + + /** Compact rewriter for append-only table. */ + public interface CompactRewriter { + List<DataFileMeta> rewrite(List<DataFileMeta> compactBefore) throws Exception; + } +} diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyWriter.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyWriter.java index 8a7105e7..305a7133 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyWriter.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyWriter.java @@ -19,6 +19,8 @@ package org.apache.flink.table.store.file.data; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.accumulators.LongCounter; import org.apache.flink.core.fs.Path; import org.apache.flink.table.data.RowData; import org.apache.flink.table.store.file.mergetree.Increment; @@ -34,26 +36,36 @@ import org.apache.flink.table.store.file.writer.RollingFileWriter; import org.apache.flink.table.store.format.FileFormat; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.types.RowKind; +import org.apache.flink.util.CloseableIterator; import org.apache.flink.util.Preconditions; import java.io.IOException; import java.util.ArrayList; +import java.util.LinkedList; import java.util.List; +import java.util.concurrent.ExecutionException; import java.util.function.Function; import java.util.function.Supplier; +import static org.apache.flink.table.store.file.data.AppendOnlyWriter.RowRollingWriter.createRollingRowWriter; + /** * A {@link RecordWriter} implementation that only accepts records which are always insert * operations and don't have any unique keys or sort keys. */ public class AppendOnlyWriter implements RecordWriter<RowData> { + private final long schemaId; + private final FileFormat fileFormat; private final long targetFileSize; + private final RowType writeSchema; private final DataFilePathFactory pathFactory; - private final FieldStatsArraySerializer statsArraySerializer; - - private final FileWriter.Factory<RowData, Metric> fileWriterFactory; - private long nextSeqNum; + private final AppendOnlyCompactManager compactManager; + private final boolean forceCompact; + private final LinkedList<DataFileMeta> toCompact; + private final List<DataFileMeta> compactBefore; + private final List<DataFileMeta> compactAfter; + private final LongCounter seqNumCounter; private RowRollingWriter writer; @@ -62,23 +74,29 @@ public class AppendOnlyWriter implements RecordWriter<RowData> { FileFormat fileFormat, long targetFileSize, RowType writeSchema, - long maxWroteSeqNumber, + LinkedList<DataFileMeta> restoredFiles, + AppendOnlyCompactManager compactManager, + boolean forceCompact, DataFilePathFactory pathFactory) { this.schemaId = schemaId; + this.fileFormat = fileFormat; this.targetFileSize = targetFileSize; + this.writeSchema = writeSchema; this.pathFactory = pathFactory; - this.statsArraySerializer = new FieldStatsArraySerializer(writeSchema); - - // Initialize the file writer factory to write records and generic metric. - this.fileWriterFactory = - MetricFileWriter.createFactory( - fileFormat.createWriterFactory(writeSchema), - Function.identity(), + this.compactManager = compactManager; + this.forceCompact = forceCompact; + this.toCompact = restoredFiles; + this.compactBefore = new ArrayList<>(); + this.compactAfter = new ArrayList<>(); + this.seqNumCounter = new LongCounter(getMaxSequenceNumber(restoredFiles) + 1); + this.writer = + createRollingRowWriter( + schemaId, + fileFormat, + targetFileSize, writeSchema, - fileFormat.createStatsExtractor(writeSchema).orElse(null)); - - this.nextSeqNum = maxWroteSeqNumber + 1; - this.writer = createRollingRowWriter(); + pathFactory, + seqNumCounter); } @Override @@ -93,21 +111,32 @@ public class AppendOnlyWriter implements RecordWriter<RowData> { @Override public Increment prepareCommit() throws Exception { List<DataFileMeta> newFiles = new ArrayList<>(); - if (writer != null) { writer.close(); newFiles.addAll(writer.result()); // Reopen the writer to accept further records. - writer = createRollingRowWriter(); + seqNumCounter.resetLocal(); + seqNumCounter.add(getMaxSequenceNumber(newFiles) + 1); + writer = + createRollingRowWriter( + schemaId, + fileFormat, + targetFileSize, + writeSchema, + pathFactory, + seqNumCounter); } - - return Increment.forAppend(newFiles); + // add new generated files + toCompact.addAll(newFiles); + submitCompaction(); + finishCompaction(forceCompact); + return drainIncrement(newFiles); } @Override public void sync() throws Exception { - // Do nothing here, as this writer don't introduce any async compaction thread currently. + finishCompaction(true); } @Override @@ -126,31 +155,124 @@ public class AppendOnlyWriter implements RecordWriter<RowData> { return result; } - private RowRollingWriter createRollingRowWriter() { - return new RowRollingWriter( - () -> new RowFileWriter(fileWriterFactory, pathFactory.newPath()), targetFileSize); + private static long getMaxSequenceNumber(List<DataFileMeta> fileMetas) { + return fileMetas.stream() + .map(DataFileMeta::maxSequenceNumber) + .max(Long::compare) + .orElse(-1L); + } + + private void submitCompaction() throws ExecutionException, InterruptedException { + finishCompaction(false); + if (compactManager.isCompactionFinished() && !toCompact.isEmpty()) { + compactManager.submitCompaction(); + } + } + + private void finishCompaction(boolean blocking) + throws ExecutionException, InterruptedException { + compactManager + .finishCompaction(blocking) + .ifPresent( + result -> { + compactBefore.addAll(result.before()); + compactAfter.addAll(result.after()); + if (!result.after().isEmpty()) { + // if the last compacted file is still small, + // add it back to the head + DataFileMeta lastFile = + result.after().get(result.after().size() - 1); + if (lastFile.fileSize() < targetFileSize) { + toCompact.offerFirst(lastFile); + } + } + }); + } + + private Increment drainIncrement(List<DataFileMeta> newFiles) { + Increment increment = + new Increment( + newFiles, new ArrayList<>(compactBefore), new ArrayList<>(compactAfter)); + compactBefore.clear(); + compactAfter.clear(); + return increment; } - private class RowRollingWriter extends RollingFileWriter<RowData, DataFileMeta> { + @VisibleForTesting + List<DataFileMeta> getToCompact() { + return toCompact; + } + + /** Rolling file writer for append-only table. */ + public static class RowRollingWriter extends RollingFileWriter<RowData, DataFileMeta> { public RowRollingWriter(Supplier<RowFileWriter> writerFactory, long targetFileSize) { super(writerFactory, targetFileSize); } + + public static RowRollingWriter createRollingRowWriter( + long schemaId, + FileFormat fileFormat, + long targetFileSize, + RowType writeSchema, + DataFilePathFactory pathFactory, + LongCounter seqNumCounter) { + return new RowRollingWriter( + () -> + new RowFileWriter( + MetricFileWriter.createFactory( + fileFormat.createWriterFactory(writeSchema), + Function.identity(), + writeSchema, + fileFormat + .createStatsExtractor(writeSchema) + .orElse(null)), + pathFactory.newPath(), + writeSchema, + schemaId, + seqNumCounter), + targetFileSize); + } + + public List<DataFileMeta> write(CloseableIterator<RowData> iterator) throws Exception { + try { + super.write(iterator); + super.close(); + return super.result(); + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + iterator.close(); + } + } } - private class RowFileWriter extends BaseFileWriter<RowData, DataFileMeta> { - private final long minSeqNum; + /** + * A {@link BaseFileWriter} impl with a counter with an initial value to record each row's + * sequence number. + */ + public static class RowFileWriter extends BaseFileWriter<RowData, DataFileMeta> { - public RowFileWriter(FileWriter.Factory<RowData, Metric> writerFactory, Path path) { + private final FieldStatsArraySerializer statsArraySerializer; + private final long schemaId; + private final LongCounter seqNumCounter; + + public RowFileWriter( + FileWriter.Factory<RowData, Metric> writerFactory, + Path path, + RowType writeSchema, + long schemaId, + LongCounter seqNumCounter) { super(writerFactory, path); - this.minSeqNum = nextSeqNum; + this.statsArraySerializer = new FieldStatsArraySerializer(writeSchema); + this.schemaId = schemaId; + this.seqNumCounter = seqNumCounter; } @Override public void write(RowData row) throws IOException { super.write(row); - - nextSeqNum += 1; + seqNumCounter.add(1L); } @Override @@ -162,8 +284,8 @@ public class AppendOnlyWriter implements RecordWriter<RowData> { FileUtils.getFileSize(path), recordCount(), stats, - minSeqNum, - Math.max(minSeqNum, nextSeqNum - 1), + seqNumCounter.getLocalValue() - super.recordCount(), + seqNumCounter.getLocalValue() - 1, schemaId); } } diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java index 9c5cd32d..50fea94f 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java @@ -22,11 +22,11 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.table.data.RowData; import org.apache.flink.table.runtime.util.MemorySegmentPool; import org.apache.flink.table.store.file.KeyValue; +import org.apache.flink.table.store.file.compact.CompactManager; +import org.apache.flink.table.store.file.compact.CompactResult; import org.apache.flink.table.store.file.data.DataFileMeta; import org.apache.flink.table.store.file.data.DataFileWriter; import org.apache.flink.table.store.file.memory.MemoryOwner; -import org.apache.flink.table.store.file.mergetree.compact.CompactManager; -import org.apache.flink.table.store.file.mergetree.compact.CompactResult; import org.apache.flink.table.store.file.mergetree.compact.MergeFunction; import org.apache.flink.table.store.file.writer.RecordWriter; import org.apache.flink.table.types.logical.RowType; @@ -228,12 +228,12 @@ public class MergeTreeWriter implements RecordWriter<KeyValue>, MemoryOwner { private void submitCompaction() throws Exception { finishCompaction(false); if (compactManager.isCompactionFinished()) { - compactManager.submitCompaction(levels); + compactManager.submitCompaction(); } } private void finishCompaction(boolean blocking) throws Exception { - Optional<CompactResult> result = compactManager.finishCompaction(levels, blocking); + Optional<CompactResult> result = compactManager.finishCompaction(blocking); result.ifPresent(this::updateCompactResult); } diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactStrategy.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactStrategy.java index a23dec01..50addb42 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactStrategy.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactStrategy.java @@ -18,6 +18,7 @@ package org.apache.flink.table.store.file.mergetree.compact; +import org.apache.flink.table.store.file.compact.CompactUnit; import org.apache.flink.table.store.file.mergetree.LevelSortedRun; import java.util.List; diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactManager.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/MergeTreeCompactManager.java similarity index 80% rename from flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactManager.java rename to flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/MergeTreeCompactManager.java index 294e54fd..17c160b2 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactManager.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/MergeTreeCompactManager.java @@ -19,6 +19,9 @@ package org.apache.flink.table.store.file.mergetree.compact; import org.apache.flink.table.data.RowData; +import org.apache.flink.table.store.file.compact.CompactManager; +import org.apache.flink.table.store.file.compact.CompactResult; +import org.apache.flink.table.store.file.compact.CompactUnit; import org.apache.flink.table.store.file.mergetree.Levels; import org.slf4j.Logger; @@ -28,15 +31,14 @@ import java.util.Comparator; import java.util.Optional; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; import java.util.stream.Collectors; -/** Manager to submit compaction task. */ -public class CompactManager { +/** Compact manager for {@link org.apache.flink.table.store.file.KeyValueFileStore}. */ +public class MergeTreeCompactManager extends CompactManager { - private static final Logger LOG = LoggerFactory.getLogger(CompactManager.class); + private static final Logger LOG = LoggerFactory.getLogger(MergeTreeCompactManager.class); - private final ExecutorService executor; + private final Levels levels; private final CompactStrategy strategy; @@ -46,27 +48,23 @@ public class CompactManager { private final CompactRewriter rewriter; - private Future<CompactResult> taskFuture; - - public CompactManager( + public MergeTreeCompactManager( ExecutorService executor, + Levels levels, CompactStrategy strategy, Comparator<RowData> keyComparator, long minFileSize, CompactRewriter rewriter) { - this.executor = executor; + super(executor); + this.levels = levels; + this.strategy = strategy; this.minFileSize = minFileSize; this.keyComparator = keyComparator; - this.strategy = strategy; this.rewriter = rewriter; } - public boolean isCompactionFinished() { - return taskFuture == null; - } - - /** Submit a new compaction task. */ - public void submitCompaction(Levels levels) { + @Override + public void submitCompaction() { if (taskFuture != null) { throw new IllegalStateException( "Please finish the previous compaction before submitting new one."); @@ -107,7 +105,8 @@ public class CompactManager { } private void submitCompaction(CompactUnit unit, boolean dropDelete) { - CompactTask task = new CompactTask(keyComparator, minFileSize, rewriter, unit, dropDelete); + MergeTreeCompactTask task = + new MergeTreeCompactTask(keyComparator, minFileSize, rewriter, unit, dropDelete); if (LOG.isDebugEnabled()) { LOG.debug( "Pick these files (name, level, size) for compaction: {}", @@ -123,22 +122,11 @@ public class CompactManager { } /** Finish current task, and update result files to {@link Levels}. */ - public Optional<CompactResult> finishCompaction(Levels levels, boolean blocking) + @Override + public Optional<CompactResult> finishCompaction(boolean blocking) throws ExecutionException, InterruptedException { - Optional<CompactResult> result = finishCompaction(blocking); + Optional<CompactResult> result = super.finishCompaction(blocking); result.ifPresent(r -> levels.update(r.before(), r.after())); return result; } - - private Optional<CompactResult> finishCompaction(boolean blocking) - throws ExecutionException, InterruptedException { - if (taskFuture != null) { - if (blocking || taskFuture.isDone()) { - CompactResult result = taskFuture.get(); - taskFuture = null; - return Optional.of(result); - } - } - return Optional.empty(); - } } diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactTask.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/MergeTreeCompactTask.java similarity index 59% rename from flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactTask.java rename to flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/MergeTreeCompactTask.java index cac1000b..df0becb1 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactTask.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/MergeTreeCompactTask.java @@ -19,23 +19,20 @@ package org.apache.flink.table.store.file.mergetree.compact; import org.apache.flink.table.data.RowData; +import org.apache.flink.table.store.file.compact.CompactResult; +import org.apache.flink.table.store.file.compact.CompactTask; +import org.apache.flink.table.store.file.compact.CompactUnit; import org.apache.flink.table.store.file.data.DataFileMeta; import org.apache.flink.table.store.file.mergetree.SortedRun; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.util.ArrayList; import java.util.Comparator; import java.util.List; -import java.util.concurrent.Callable; import static java.util.Collections.singletonList; -/** Compaction task. */ -public class CompactTask implements Callable<CompactResult> { - - private static final Logger LOG = LoggerFactory.getLogger(CompactTask.class); +/** Compact task for merge tree compaction. */ +public class MergeTreeCompactTask extends CompactTask { private final long minFileSize; private final CompactRewriter rewriter; @@ -45,41 +42,30 @@ public class CompactTask implements Callable<CompactResult> { private final boolean dropDelete; - // metrics - private long rewriteInputSize; - private long rewriteOutputSize; - private int rewriteFilesNum; + // metric private int upgradeFilesNum; - public CompactTask( + public MergeTreeCompactTask( Comparator<RowData> keyComparator, long minFileSize, CompactRewriter rewriter, CompactUnit unit, boolean dropDelete) { + super(unit.files()); this.minFileSize = minFileSize; this.rewriter = rewriter; this.outputLevel = unit.outputLevel(); this.partitioned = new IntervalPartition(unit.files(), keyComparator).partition(); this.dropDelete = dropDelete; - this.rewriteInputSize = 0; - this.rewriteOutputSize = 0; - this.rewriteFilesNum = 0; this.upgradeFilesNum = 0; } @Override - public CompactResult call() throws Exception { - return compact(); - } - - private CompactResult compact() throws Exception { - long startMillis = System.currentTimeMillis(); - + protected CompactResult doCompact(List<DataFileMeta> inputs) throws Exception { List<List<SortedRun>> candidate = new ArrayList<>(); - List<DataFileMeta> before = new ArrayList<>(); - List<DataFileMeta> after = new ArrayList<>(); + List<DataFileMeta> compactBefore = new ArrayList<>(); + List<DataFileMeta> compactAfter = new ArrayList<>(); // Checking the order and compacting adjacent and contiguous files // Note: can't skip an intermediate file to compact, this will destroy the overall @@ -99,40 +85,47 @@ public class CompactTask implements Callable<CompactResult> { candidate.add(singletonList(SortedRun.fromSingle(file))); } else { // Large file appear, rewrite previous and upgrade it - rewrite(candidate, before, after); - upgrade(file, before, after); + rewrite(candidate, compactBefore, compactAfter); + upgrade(file, compactBefore, compactAfter); } } } } - rewrite(candidate, before, after); - - if (LOG.isDebugEnabled()) { - LOG.debug( - "Done compacting {} files to {} files in {}ms. " - + "Rewrite input size = {}, output size = {}, rewrite file num = {}, upgrade file num = {}", - before.size(), - after.size(), - System.currentTimeMillis() - startMillis, - rewriteInputSize, - rewriteOutputSize, - rewriteFilesNum, - upgradeFilesNum); - } + rewrite(candidate, compactBefore, compactAfter); + return new CompactResult() { + @Override + public List<DataFileMeta> before() { + return compactBefore; + } + + @Override + public List<DataFileMeta> after() { + return compactAfter; + } + }; + } - return result(before, after); + @Override + protected String logMetric( + long startMillis, List<DataFileMeta> compactBefore, List<DataFileMeta> compactAfter) { + return String.format( + "%s, upgrade file num = %d", + super.logMetric(startMillis, compactBefore, compactAfter), upgradeFilesNum); } - private void upgrade(DataFileMeta file, List<DataFileMeta> before, List<DataFileMeta> after) { + private void upgrade( + DataFileMeta file, List<DataFileMeta> compactBefore, List<DataFileMeta> compactAfter) { if (file.level() != outputLevel) { - before.add(file); - after.add(file.upgrade(outputLevel)); + compactBefore.add(file); + compactAfter.add(file.upgrade(outputLevel)); upgradeFilesNum++; } } private void rewrite( - List<List<SortedRun>> candidate, List<DataFileMeta> before, List<DataFileMeta> after) + List<List<SortedRun>> candidate, + List<DataFileMeta> compactBefore, + List<DataFileMeta> compactAfter) throws Exception { if (candidate.isEmpty()) { return; @@ -143,40 +136,15 @@ public class CompactTask implements Callable<CompactResult> { return; } else if (section.size() == 1) { for (DataFileMeta file : section.get(0).files()) { - upgrade(file, before, after); + upgrade(file, compactBefore, compactAfter); } candidate.clear(); return; } } - candidate.forEach( - runs -> - runs.forEach( - run -> { - before.addAll(run.files()); - rewriteInputSize += - run.files().stream() - .mapToLong(DataFileMeta::fileSize) - .sum(); - rewriteFilesNum += run.files().size(); - })); + candidate.forEach(runs -> runs.forEach(run -> compactBefore.addAll(run.files()))); List<DataFileMeta> result = rewriter.rewrite(outputLevel, dropDelete, candidate); - after.addAll(result); - rewriteOutputSize += result.stream().mapToLong(DataFileMeta::fileSize).sum(); + compactAfter.addAll(result); candidate.clear(); } - - private CompactResult result(List<DataFileMeta> before, List<DataFileMeta> after) { - return new CompactResult() { - @Override - public List<DataFileMeta> before() { - return before; - } - - @Override - public List<DataFileMeta> after() { - return after; - } - }; - } } diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompaction.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompaction.java index 44d51af7..f4217431 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompaction.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompaction.java @@ -19,6 +19,7 @@ package org.apache.flink.table.store.file.mergetree.compact; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.table.store.file.compact.CompactUnit; import org.apache.flink.table.store.file.mergetree.LevelSortedRun; import org.apache.flink.table.store.file.mergetree.SortedRun; diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java index 6494c31a..7f7168eb 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java @@ -166,7 +166,7 @@ public abstract class AbstractFileStoreScan implements FileStoreScan { Snapshot snapshot = snapshotManager.snapshot(snapshotId); manifests = isIncremental - ? manifestList.read(snapshot.deltaManifestList()) + ? readIncremental(snapshotId) : snapshot.readAllManifests(manifestList); } } @@ -289,4 +289,14 @@ public abstract class AbstractFileStoreScan implements FileStoreScan { private List<ManifestEntry> readManifestFileMeta(ManifestFileMeta manifest) { return manifestFileFactory.create().read(manifest.fileName()); } + + private List<ManifestFileMeta> readIncremental(Long snapshotId) { + Snapshot snapshot = snapshotManager.snapshot(snapshotId); + if (snapshot.commitKind() == Snapshot.CommitKind.APPEND) { + return manifestList.read(snapshot.deltaManifestList()); + } + throw new IllegalStateException( + String.format( + "Incremental scan does not accept %s snapshot", snapshot.commitKind())); + } } diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreWrite.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreWrite.java index 7d8cc440..30456991 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreWrite.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreWrite.java @@ -18,20 +18,26 @@ package org.apache.flink.table.store.file.operation; +import org.apache.flink.api.common.accumulators.LongCounter; import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.binary.BinaryRowData; +import org.apache.flink.table.store.file.compact.CompactResult; +import org.apache.flink.table.store.file.data.AppendOnlyCompactManager; import org.apache.flink.table.store.file.data.AppendOnlyWriter; import org.apache.flink.table.store.file.data.DataFileMeta; import org.apache.flink.table.store.file.data.DataFilePathFactory; -import org.apache.flink.table.store.file.mergetree.compact.CompactResult; import org.apache.flink.table.store.file.utils.FileStorePathFactory; +import org.apache.flink.table.store.file.utils.RecordReaderIterator; import org.apache.flink.table.store.file.utils.SnapshotManager; import org.apache.flink.table.store.file.writer.RecordWriter; import org.apache.flink.table.store.format.FileFormat; +import org.apache.flink.table.store.table.source.Split; import org.apache.flink.table.types.logical.RowType; import javax.annotation.Nullable; +import java.util.Collections; +import java.util.LinkedList; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; @@ -39,39 +45,51 @@ import java.util.concurrent.ExecutorService; /** {@link FileStoreWrite} for {@link org.apache.flink.table.store.file.AppendOnlyFileStore}. */ public class AppendOnlyFileStoreWrite extends AbstractFileStoreWrite<RowData> { + private final AppendOnlyFileStoreRead read; private final long schemaId; private final RowType rowType; private final FileFormat fileFormat; private final FileStorePathFactory pathFactory; private final long targetFileSize; + private final int minFileNum; + private final int maxFileNum; + private final boolean commitForceCompact; public AppendOnlyFileStoreWrite( + AppendOnlyFileStoreRead read, long schemaId, RowType rowType, FileFormat fileFormat, FileStorePathFactory pathFactory, SnapshotManager snapshotManager, FileStoreScan scan, - long targetFileSize) { + long targetFileSize, + int minFileNum, + int maxFileNum, + boolean commitForceCompact) { super(snapshotManager, scan); + this.read = read; this.schemaId = schemaId; this.rowType = rowType; this.fileFormat = fileFormat; this.pathFactory = pathFactory; this.targetFileSize = targetFileSize; + this.maxFileNum = maxFileNum; + this.minFileNum = minFileNum; + this.commitForceCompact = commitForceCompact; } @Override public RecordWriter<RowData> createWriter( BinaryRowData partition, int bucket, ExecutorService compactExecutor) { return createWriter( - partition, bucket, getMaxSequenceNumber(scanExistingFileMetas(partition, bucket))); + partition, bucket, scanExistingFileMetas(partition, bucket), compactExecutor); } @Override public RecordWriter<RowData> createEmptyWriter( BinaryRowData partition, int bucket, ExecutorService compactExecutor) { - return createWriter(partition, bucket, -1L); + return createWriter(partition, bucket, Collections.emptyList(), compactExecutor); } @Override @@ -82,9 +100,48 @@ public class AppendOnlyFileStoreWrite extends AbstractFileStoreWrite<RowData> { } private RecordWriter<RowData> createWriter( - BinaryRowData partition, int bucket, long maxSeqNum) { + BinaryRowData partition, + int bucket, + List<DataFileMeta> restoredFiles, + ExecutorService compactExecutor) { + // let writer and compact manager hold the same reference + // and make restore files mutable to update + LinkedList<DataFileMeta> restored = new LinkedList<>(restoredFiles); DataFilePathFactory factory = pathFactory.createDataFilePathFactory(partition, bucket); return new AppendOnlyWriter( - schemaId, fileFormat, targetFileSize, rowType, maxSeqNum, factory); + schemaId, + fileFormat, + targetFileSize, + rowType, + restored, + new AppendOnlyCompactManager( + compactExecutor, + restored, + minFileNum, + maxFileNum, + targetFileSize, + compactRewriter(partition, bucket)), + commitForceCompact, + factory); + } + + private AppendOnlyCompactManager.CompactRewriter compactRewriter( + BinaryRowData partition, int bucket) { + return toCompact -> { + if (toCompact.isEmpty()) { + return Collections.emptyList(); + } + AppendOnlyWriter.RowRollingWriter rewriter = + AppendOnlyWriter.RowRollingWriter.createRollingRowWriter( + schemaId, + fileFormat, + targetFileSize, + rowType, + pathFactory.createDataFilePathFactory(partition, bucket), + new LongCounter(toCompact.get(0).minSequenceNumber())); + return rewriter.write( + new RecordReaderIterator<>( + read.createReader(new Split(partition, bucket, toCompact, false)))); + }; } } diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWrite.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWrite.java index 862e7596..95cf0991 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWrite.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWrite.java @@ -19,8 +19,8 @@ package org.apache.flink.table.store.file.operation; import org.apache.flink.table.data.binary.BinaryRowData; +import org.apache.flink.table.store.file.compact.CompactResult; import org.apache.flink.table.store.file.data.DataFileMeta; -import org.apache.flink.table.store.file.mergetree.compact.CompactResult; import org.apache.flink.table.store.file.writer.RecordWriter; import javax.annotation.Nullable; diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java index 09cd4604..574c7eb0 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java @@ -22,26 +22,26 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.binary.BinaryRowData; import org.apache.flink.table.store.CoreOptions; import org.apache.flink.table.store.file.KeyValue; +import org.apache.flink.table.store.file.compact.CompactManager; +import org.apache.flink.table.store.file.compact.CompactResult; +import org.apache.flink.table.store.file.compact.CompactUnit; import org.apache.flink.table.store.file.data.DataFileMeta; import org.apache.flink.table.store.file.data.DataFileReader; import org.apache.flink.table.store.file.data.DataFileWriter; import org.apache.flink.table.store.file.mergetree.Levels; import org.apache.flink.table.store.file.mergetree.MergeTreeReader; import org.apache.flink.table.store.file.mergetree.MergeTreeWriter; -import org.apache.flink.table.store.file.mergetree.compact.CompactManager; -import org.apache.flink.table.store.file.mergetree.compact.CompactResult; import org.apache.flink.table.store.file.mergetree.compact.CompactRewriter; import org.apache.flink.table.store.file.mergetree.compact.CompactStrategy; -import org.apache.flink.table.store.file.mergetree.compact.CompactTask; -import org.apache.flink.table.store.file.mergetree.compact.CompactUnit; import org.apache.flink.table.store.file.mergetree.compact.MergeFunction; +import org.apache.flink.table.store.file.mergetree.compact.MergeTreeCompactManager; +import org.apache.flink.table.store.file.mergetree.compact.MergeTreeCompactTask; import org.apache.flink.table.store.file.mergetree.compact.UniversalCompaction; import org.apache.flink.table.store.file.schema.SchemaManager; import org.apache.flink.table.store.file.utils.FileStorePathFactory; import org.apache.flink.table.store.file.utils.RecordReaderIterator; import org.apache.flink.table.store.file.utils.SnapshotManager; import org.apache.flink.table.store.file.writer.RecordWriter; -import org.apache.flink.table.store.format.FileFormat; import org.apache.flink.table.types.logical.RowType; import javax.annotation.Nullable; @@ -69,7 +69,6 @@ public class KeyValueFileStoreWrite extends AbstractFileStoreWrite<KeyValue> { RowType valueType, Supplier<Comparator<RowData>> keyComparatorSupplier, MergeFunction mergeFunction, - FileFormat fileFormat, FileStorePathFactory pathFactory, SnapshotManager snapshotManager, FileStoreScan scan, @@ -77,13 +76,18 @@ public class KeyValueFileStoreWrite extends AbstractFileStoreWrite<KeyValue> { super(snapshotManager, scan); this.dataFileReaderFactory = new DataFileReader.Factory( - schemaManager, schemaId, keyType, valueType, fileFormat, pathFactory); + schemaManager, + schemaId, + keyType, + valueType, + options.fileFormat(), + pathFactory); this.dataFileWriterFactory = new DataFileWriter.Factory( schemaId, keyType, valueType, - fileFormat, + options.fileFormat(), pathFactory, options.targetFileSize()); this.keyComparatorSupplier = keyComparatorSupplier; @@ -115,7 +119,8 @@ public class KeyValueFileStoreWrite extends AbstractFileStoreWrite<KeyValue> { Levels levels = new Levels(keyComparator, compactFiles, options.numLevels()); CompactUnit unit = CompactUnit.fromLevelRuns(levels.numberOfLevels() - 1, levels.levelSortedRuns()); - return new CompactTask(keyComparator, options.targetFileSize(), rewriter, unit, true); + return new MergeTreeCompactTask( + keyComparator, options.targetFileSize(), rewriter, unit, true); } private MergeTreeWriter createMergeTreeWriter( @@ -125,6 +130,7 @@ public class KeyValueFileStoreWrite extends AbstractFileStoreWrite<KeyValue> { ExecutorService compactExecutor) { DataFileWriter dataFileWriter = dataFileWriterFactory.create(partition, bucket); Comparator<RowData> keyComparator = keyComparatorSupplier.get(); + Levels levels = new Levels(keyComparator, restoreFiles, options.numLevels()); return new MergeTreeWriter( dataFileWriter.keyType(), dataFileWriter.valueType(), @@ -133,10 +139,11 @@ public class KeyValueFileStoreWrite extends AbstractFileStoreWrite<KeyValue> { bucket, new UniversalCompaction( options.maxSizeAmplificationPercent(), - options.sizeRatio(), + options.sortedRunSizeRatio(), options.numSortedRunCompactionTrigger()), - compactExecutor), - new Levels(keyComparator, restoreFiles, options.numLevels()), + compactExecutor, + levels), + levels, getMaxSequenceNumber(restoreFiles), keyComparator, mergeFunction.copy(), @@ -150,11 +157,13 @@ public class KeyValueFileStoreWrite extends AbstractFileStoreWrite<KeyValue> { BinaryRowData partition, int bucket, CompactStrategy compactStrategy, - ExecutorService compactExecutor) { + ExecutorService compactExecutor, + Levels levels) { Comparator<RowData> keyComparator = keyComparatorSupplier.get(); CompactRewriter rewriter = compactRewriter(partition, bucket, keyComparator); - return new CompactManager( + return new MergeTreeCompactManager( compactExecutor, + levels, compactStrategy, keyComparator, options.targetFileSize(), diff --git a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableCompact.java b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableCompact.java index 68014345..21161345 100644 --- a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableCompact.java +++ b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableCompact.java @@ -19,9 +19,9 @@ package org.apache.flink.table.store.table.sink; import org.apache.flink.table.data.binary.BinaryRowData; +import org.apache.flink.table.store.file.compact.CompactResult; import org.apache.flink.table.store.file.data.DataFileMeta; import org.apache.flink.table.store.file.mergetree.Increment; -import org.apache.flink.table.store.file.mergetree.compact.CompactResult; import org.apache.flink.table.store.file.operation.FileStoreScan; import org.apache.flink.table.store.file.operation.FileStoreWrite; import org.apache.flink.table.store.file.predicate.PredicateConverter; diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/AppendOnlyCompactManagerTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/AppendOnlyCompactManagerTest.java new file mode 100644 index 00000000..9979f080 --- /dev/null +++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/AppendOnlyCompactManagerTest.java @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.store.file.data; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.Optional; + +import static org.apache.flink.table.store.file.data.DataFileTestUtils.newFile; +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link org.apache.flink.table.store.file.data.AppendOnlyCompactManager}. */ +public class AppendOnlyCompactManagerTest { + + @Test + public void testPickEmptyAndNotRelease() { + // 1~50 is small enough, so hold it + List<DataFileMeta> toCompact = Collections.singletonList(newFile(1L, 50L)); + innerTest(toCompact, false, Collections.emptyList(), toCompact); + } + + @Test + public void testPickEmptyAndRelease() { + // large file, release + innerTest( + Collections.singletonList(newFile(1L, 1024L)), + false, + Collections.emptyList(), + Collections.emptyList()); + + // small file at last, release previous + innerTest( + Arrays.asList(newFile(1L, 1024L), newFile(1025L, 2049L), newFile(2050L, 2100L)), + false, + Collections.emptyList(), + Collections.singletonList(newFile(2050L, 2100L))); + innerTest( + Arrays.asList( + newFile(1L, 1024L), + newFile(1025L, 2049L), + newFile(2050L, 2100L), + newFile(2100L, 2110L)), + false, + Collections.emptyList(), + Arrays.asList(newFile(2050L, 2100L), newFile(2100L, 2110L))); + innerTest( + Arrays.asList(newFile(1L, 1024L), newFile(1025L, 2049L), newFile(2050L, 2500L)), + false, + Collections.emptyList(), + Collections.singletonList(newFile(2050L, 2500L))); + innerTest( + Arrays.asList( + newFile(1L, 1024L), + newFile(1025L, 2049L), + newFile(2050L, 2500L), + newFile(2501L, 4096L), + newFile(4097L, 6000L), + newFile(6001L, 7000L), + newFile(7001L, 7600L)), + false, + Collections.emptyList(), + Collections.singletonList(newFile(7001L, 7600L))); + + // ignore single small file (in the middle) + innerTest( + Arrays.asList( + newFile(1L, 1024L), + newFile(1025L, 2049L), + newFile(2050L, 2500L), + newFile(2501L, 4096L)), + false, + Collections.emptyList(), + Collections.singletonList(newFile(2501L, 4096L))); + + innerTest( + Arrays.asList( + newFile(1L, 1024L), + newFile(1025L, 2049L), + newFile(2050L, 2500L), + newFile(2501L, 4096L), + newFile(4097L, 6000L)), + false, + Collections.emptyList(), + Collections.singletonList(newFile(4097L, 6000L))); + + // wait for more file + innerTest( + Arrays.asList(newFile(1L, 500L), newFile(501L, 1000L)), + false, + Collections.emptyList(), + Arrays.asList(newFile(1L, 500L), newFile(501L, 1000L))); + + innerTest( + Arrays.asList(newFile(1L, 500L), newFile(501L, 1000L), newFile(1001L, 2026L)), + false, + Collections.emptyList(), + Arrays.asList(newFile(501L, 1000L), newFile(1001L, 2026L))); + + innerTest( + Arrays.asList(newFile(1L, 2000L), newFile(2001L, 2005L), newFile(2006L, 2010L)), + false, + Collections.emptyList(), + Arrays.asList(newFile(2001L, 2005L), newFile(2006L, 2010L))); + } + + @Test + public void testPick() { + // fileNum is 13 (which > 12) and totalFileSize is 130 (which < 1024) + List<DataFileMeta> toCompact1 = + Arrays.asList( + // 1~10, 11~20, ..., 111~120 + newFile(1L, 10L), + newFile(11L, 20L), + newFile(21L, 30L), + newFile(31L, 40L), + newFile(41L, 50L), + newFile(51L, 60L), + newFile(61L, 70L), + newFile(71L, 80L), + newFile(81L, 90L), + newFile(91L, 100L), + newFile(101L, 110L), + newFile(111L, 120L), + newFile(121L, 130L)); + innerTest( + toCompact1, + true, + toCompact1.subList(0, toCompact1.size() - 1), + Collections.singletonList(newFile(121L, 130L))); + + // fileNum is 4 (which > 3) and totalFileSize is 1026 (which > 1024) + List<DataFileMeta> toCompact2 = + Arrays.asList( + // 1~2, 3~500, 501~1000, 1001~1025 + newFile(1L, 2L), + newFile(3L, 500L), + newFile(501L, 1000L), + newFile(1001L, 1025L), + newFile(1026L, 1050L)); + innerTest( + toCompact2, + true, + toCompact2.subList(0, 4), + Collections.singletonList(newFile(1026L, 1050L))); + + // fileNum is 13 (which > 12) and totalFileSize is 130 (which < 1024) + List<DataFileMeta> toCompact3 = + Arrays.asList( + newFile(1L, 1022L), + newFile(1023L, 1024L), + newFile(1025L, 2050L), + // 2051~2510, ..., 2611~2620 + newFile(2051L, 2510L), + newFile(2511L, 2520L), + newFile(2521L, 2530L), + newFile(2531L, 2540L), + newFile(2541L, 2550L), + newFile(2551L, 2560L), + newFile(2561L, 2570L), + newFile(2571L, 2580L), + newFile(2581L, 2590L), + newFile(2591L, 2600L), + newFile(2601L, 2610L), + newFile(2611L, 2620L), + newFile(2621L, 2630L)); + innerTest( + toCompact3, + true, + toCompact3.subList(3, toCompact3.size() - 1), + Collections.singletonList(newFile(2621L, 2630L))); + } + + private void innerTest( + List<DataFileMeta> toCompactBeforePick, + boolean expectedPresent, + List<DataFileMeta> expectedCompactBefore, + List<DataFileMeta> toCompactAfterPick) { + int minFileNum = 4; + int maxFileNum = 12; + long targetFileSize = 1024; + AppendOnlyCompactManager manager = + new AppendOnlyCompactManager( + null, + new LinkedList<>(toCompactBeforePick), + minFileNum, + maxFileNum, + targetFileSize, + null); + Optional<List<DataFileMeta>> actual = manager.pickCompactBefore(); + assertThat(actual.isPresent()).isEqualTo(expectedPresent); + if (expectedPresent) { + assertThat(actual.get()).containsExactlyElementsOf(expectedCompactBefore); + } + assertThat(manager.getToCompact()).containsExactlyElementsOf(toCompactAfterPick); + } +} diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/AppendOnlyWriterTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/AppendOnlyWriterTest.java index 3fdeb330..a60380c0 100644 --- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/AppendOnlyWriterTest.java +++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/AppendOnlyWriterTest.java @@ -35,13 +35,18 @@ import org.apache.flink.table.types.logical.IntType; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.table.types.logical.VarCharType; +import org.apache.flink.util.concurrent.ExecutorThreadFactory; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; +import java.util.ArrayList; import java.util.Collections; +import java.util.LinkedList; import java.util.List; +import java.util.UUID; +import java.util.concurrent.Executors; import static org.assertj.core.api.Assertions.assertThat; @@ -61,6 +66,9 @@ public class AppendOnlyWriterTest { private static final String AVRO = "avro"; private static final String PART = "2022-05-01"; + private static final long SCHEMA_ID = 0L; + private static final int MIN_FILE_NUM = 3; + private static final int MAX_FILE_NUM = 4; @BeforeEach public void before() { @@ -69,7 +77,7 @@ public class AppendOnlyWriterTest { @Test public void testEmptyCommits() throws Exception { - RecordWriter<RowData> writer = createWriter(1024 * 1024L, SCHEMA, 0); + RecordWriter<RowData> writer = createEmptyWriter(1024 * 1024L); for (int i = 0; i < 3; i++) { writer.sync(); @@ -83,7 +91,7 @@ public class AppendOnlyWriterTest { @Test public void testSingleWrite() throws Exception { - RecordWriter<RowData> writer = createWriter(1024 * 1024L, SCHEMA, 0); + RecordWriter<RowData> writer = createEmptyWriter(1024 * 1024L); writer.write(row(1, "AAA", PART)); List<DataFileMeta> result = writer.close(); @@ -106,14 +114,14 @@ public class AppendOnlyWriterTest { }; assertThat(meta.valueStats()).isEqualTo(STATS_SERIALIZER.toBinary(expected)); - assertThat(meta.minSequenceNumber()).isEqualTo(1); - assertThat(meta.maxSequenceNumber()).isEqualTo(1); + assertThat(meta.minSequenceNumber()).isEqualTo(0); + assertThat(meta.maxSequenceNumber()).isEqualTo(0); assertThat(meta.level()).isEqualTo(DataFileMeta.DUMMY_LEVEL); } @Test public void testMultipleCommits() throws Exception { - RecordWriter<RowData> writer = createWriter(1024 * 1024L, SCHEMA, 0); + RecordWriter<RowData> writer = createWriter(1024 * 1024L, true, Collections.emptyList()); // Commit 5 continues txn. for (int txn = 0; txn < 5; txn += 1) { @@ -127,8 +135,25 @@ public class AppendOnlyWriterTest { writer.sync(); Increment inc = writer.prepareCommit(); - assertThat(inc.compactBefore()).isEqualTo(Collections.emptyList()); - assertThat(inc.compactAfter()).isEqualTo(Collections.emptyList()); + if (txn > 0 && txn % 3 == 0) { + assertThat(inc.compactBefore()).hasSize(4); + assertThat(inc.compactAfter()).hasSize(1); + DataFileMeta compactAfter = inc.compactAfter().get(0); + assertThat(compactAfter.fileName()).startsWith("compact-"); + assertThat(compactAfter.fileSize()) + .isEqualTo( + inc.compactBefore().stream() + .mapToLong(DataFileMeta::fileSize) + .sum()); + assertThat(compactAfter.rowCount()) + .isEqualTo( + inc.compactBefore().stream() + .mapToLong(DataFileMeta::rowCount) + .sum()); + } else { + assertThat(inc.compactBefore()).isEqualTo(Collections.emptyList()); + assertThat(inc.compactAfter()).isEqualTo(Collections.emptyList()); + } assertThat(inc.newFiles().size()).isEqualTo(1); DataFileMeta meta = inc.newFiles().get(0); @@ -149,8 +174,8 @@ public class AppendOnlyWriterTest { }; assertThat(meta.valueStats()).isEqualTo(STATS_SERIALIZER.toBinary(expected)); - assertThat(meta.minSequenceNumber()).isEqualTo(start + 1); - assertThat(meta.maxSequenceNumber()).isEqualTo(end); + assertThat(meta.minSequenceNumber()).isEqualTo(start); + assertThat(meta.maxSequenceNumber()).isEqualTo(end - 1); assertThat(meta.level()).isEqualTo(DataFileMeta.DUMMY_LEVEL); } } @@ -159,21 +184,21 @@ public class AppendOnlyWriterTest { public void testRollingWrite() throws Exception { // Set a very small target file size, so that we will roll over to a new file even if // writing one record. - RecordWriter<RowData> writer = createWriter(10L, SCHEMA, 0); + AppendOnlyWriter writer = createEmptyWriter(10L); for (int i = 0; i < 10; i++) { writer.write(row(i, String.format("%03d", i), PART)); } writer.sync(); - Increment inc = writer.prepareCommit(); - assertThat(inc.compactBefore()).isEqualTo(Collections.emptyList()); - assertThat(inc.compactAfter()).isEqualTo(Collections.emptyList()); + Increment firstInc = writer.prepareCommit(); + assertThat(firstInc.compactBefore()).isEqualTo(Collections.emptyList()); + assertThat(firstInc.compactAfter()).isEqualTo(Collections.emptyList()); - assertThat(inc.newFiles().size()).isEqualTo(10); + assertThat(firstInc.newFiles().size()).isEqualTo(10); int id = 0; - for (DataFileMeta meta : inc.newFiles()) { + for (DataFileMeta meta : firstInc.newFiles()) { Path path = pathFactory.toPath(meta.fileName()); assertThat(path.getFileSystem().exists(path)).isTrue(); @@ -190,12 +215,48 @@ public class AppendOnlyWriterTest { }; assertThat(meta.valueStats()).isEqualTo(STATS_SERIALIZER.toBinary(expected)); - assertThat(meta.minSequenceNumber()).isEqualTo(id + 1); - assertThat(meta.maxSequenceNumber()).isEqualTo(id + 1); + assertThat(meta.minSequenceNumber()).isEqualTo(id); + assertThat(meta.maxSequenceNumber()).isEqualTo(id); assertThat(meta.level()).isEqualTo(DataFileMeta.DUMMY_LEVEL); id += 1; } + + // increase target file size to test compaction + long targetFileSize = 1024 * 1024L; + writer = createWriter(targetFileSize, true, firstInc.newFiles()); + assertThat(writer.getToCompact()).containsExactlyElementsOf(firstInc.newFiles()); + writer.write(row(id, String.format("%03d", id), PART)); + writer.sync(); + Increment secInc = writer.prepareCommit(); + + // check compact before and after + List<DataFileMeta> compactBefore = secInc.compactBefore(); + List<DataFileMeta> compactAfter = secInc.compactAfter(); + assertThat(compactBefore) + .containsExactlyInAnyOrderElementsOf(firstInc.newFiles().subList(0, 4)); + assertThat(compactAfter).hasSize(1); + assertThat(compactBefore.stream().mapToLong(DataFileMeta::fileSize).sum()) + .isEqualTo(compactAfter.stream().mapToLong(DataFileMeta::fileSize).sum()); + assertThat(compactBefore.stream().mapToLong(DataFileMeta::rowCount).sum()) + .isEqualTo(compactAfter.stream().mapToLong(DataFileMeta::rowCount).sum()); + // check seq number + assertThat(compactBefore.get(0).minSequenceNumber()) + .isEqualTo(compactAfter.get(0).minSequenceNumber()); + assertThat(compactBefore.get(compactBefore.size() - 1).maxSequenceNumber()) + .isEqualTo(compactAfter.get(compactAfter.size() - 1).maxSequenceNumber()); + assertThat(secInc.newFiles()).hasSize(1); + + /* check toCompact[round + 1] is composed of + * <1> the compactAfter[round] (due to small size) + * <2> the rest of toCompact[round] + * <3> the newFiles[round] + * with strict order + */ + List<DataFileMeta> toCompact = new ArrayList<>(compactAfter); + toCompact.addAll(firstInc.newFiles().subList(4, firstInc.newFiles().size())); + toCompact.addAll(secInc.newFiles()); + assertThat(writer.getToCompact()).containsExactlyElementsOf(toCompact); } private FieldStats initStats(Integer min, Integer max, long nullCount) { @@ -214,15 +275,68 @@ public class AppendOnlyWriterTest { return new DataFilePathFactory( new Path(tempDir.toString()), "dt=" + PART, - 1, + 0, CoreOptions.FILE_FORMAT.defaultValue()); } - private RecordWriter<RowData> createWriter( - long targetFileSize, RowType writeSchema, long maxSeqNum) { - FileFormat fileFormat = FileFormat.fromIdentifier(AVRO, new Configuration()); + private AppendOnlyWriter createEmptyWriter(long targetFileSize) { + return createWriter(targetFileSize, false, Collections.emptyList()); + } + private AppendOnlyWriter createWriter( + long targetFileSize, boolean forceCompact, List<DataFileMeta> scannedFiles) { + FileFormat fileFormat = FileFormat.fromIdentifier(AVRO, new Configuration()); + LinkedList<DataFileMeta> toCompact = new LinkedList<>(scannedFiles); return new AppendOnlyWriter( - 0, fileFormat, targetFileSize, writeSchema, maxSeqNum, pathFactory); + SCHEMA_ID, + fileFormat, + targetFileSize, + AppendOnlyWriterTest.SCHEMA, + toCompact, + new AppendOnlyCompactManager( + Executors.newSingleThreadScheduledExecutor( + new ExecutorThreadFactory("compaction-thread")), + toCompact, + MIN_FILE_NUM, + MAX_FILE_NUM, + targetFileSize, + compactBefore -> + compactBefore.isEmpty() + ? Collections.emptyList() + : Collections.singletonList( + generateCompactAfter(compactBefore))), + forceCompact, + pathFactory); + } + + private DataFileMeta generateCompactAfter(List<DataFileMeta> toCompact) { + int size = toCompact.size(); + long minSeq = toCompact.get(0).minSequenceNumber(); + long maxSeq = toCompact.get(size - 1).maxSequenceNumber(); + String fileName = "compact-" + UUID.randomUUID(); + return DataFileMeta.forAppend( + fileName, + toCompact.stream().mapToLong(DataFileMeta::fileSize).sum(), + toCompact.stream().mapToLong(DataFileMeta::rowCount).sum(), + STATS_SERIALIZER.toBinary( + new FieldStats[] { + initStats( + toCompact.get(0).valueStats().min().getInt(0), + toCompact.get(size - 1).valueStats().max().getInt(0), + 0), + initStats( + toCompact.get(0).valueStats().min().getString(1).toString(), + toCompact + .get(size - 1) + .valueStats() + .max() + .getString(1) + .toString(), + 0), + initStats(PART, PART, 0) + }), + minSeq, + maxSeq, + toCompact.get(0).schemaId()); } } diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/DataFileTestUtils.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/DataFileTestUtils.java index d515a891..b977c6a8 100644 --- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/DataFileTestUtils.java +++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/DataFileTestUtils.java @@ -32,6 +32,21 @@ public class DataFileTestUtils { return newFile("", level, minKey, maxKey, maxSequence); } + public static DataFileMeta newFile(long minSeq, long maxSeq) { + return new DataFileMeta( + "", + maxSeq - minSeq + 1, + maxSeq - minSeq + 1, + DataFileMeta.EMPTY_MIN_KEY, + DataFileMeta.EMPTY_MAX_KEY, + DataFileMeta.EMPTY_KEY_STATS, + null, + minSeq, + maxSeq, + 0L, + DataFileMeta.DUMMY_LEVEL); + } + public static DataFileMeta newFile( String name, int level, int minKey, int maxKey, long maxSequence) { return new DataFileMeta( diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FileFormatSuffixTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FileFormatSuffixTest.java index 2288c931..4be7c693 100644 --- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FileFormatSuffixTest.java +++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FileFormatSuffixTest.java @@ -22,6 +22,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.StringData; +import org.apache.flink.table.store.file.data.AppendOnlyCompactManager; import org.apache.flink.table.store.file.data.AppendOnlyWriter; import org.apache.flink.table.store.file.data.DataFileMeta; import org.apache.flink.table.store.file.data.DataFilePathFactory; @@ -37,6 +38,7 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; +import java.util.LinkedList; import java.util.List; /** test file format suffix. */ @@ -57,8 +59,17 @@ public class FileFormatSuffixTest extends DataFileTest { DataFilePathFactory dataFilePathFactory = new DataFilePathFactory(new Path(tempDir.toString()), "dt=1", 1, format); FileFormat fileFormat = FileFormat.fromIdentifier(format, new Configuration()); + LinkedList<DataFileMeta> toCompact = new LinkedList<>(); AppendOnlyWriter appendOnlyWriter = - new AppendOnlyWriter(0, fileFormat, 10, SCHEMA, 10, dataFilePathFactory); + new AppendOnlyWriter( + 0, + fileFormat, + 10, + SCHEMA, + toCompact, + new AppendOnlyCompactManager(null, toCompact, 4, 10, 10, null), // not used + false, + dataFilePathFactory); appendOnlyWriter.write( GenericRowData.of(1, StringData.fromString("aaa"), StringData.fromString("1"))); List<DataFileMeta> result = appendOnlyWriter.close(); diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializerTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializerTest.java index df629d52..9867811d 100644 --- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializerTest.java +++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializerTest.java @@ -29,7 +29,7 @@ import java.util.Arrays; import java.util.Random; import java.util.concurrent.atomic.AtomicInteger; -import static org.apache.flink.table.store.file.mergetree.compact.CompactManagerTest.row; +import static org.apache.flink.table.store.file.mergetree.compact.MergeTreeCompactManagerTest.row; import static org.apache.flink.table.store.file.stats.StatsTestUtils.newTableStats; import static org.assertj.core.api.Assertions.assertThat; diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/LevelsTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/LevelsTest.java index 674bb2ec..aed28096 100644 --- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/LevelsTest.java +++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/LevelsTest.java @@ -27,7 +27,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.Comparator; -import static org.apache.flink.table.store.file.mergetree.compact.CompactManagerTest.row; +import static org.apache.flink.table.store.file.mergetree.compact.MergeTreeCompactManagerTest.row; import static org.assertj.core.api.Assertions.assertThat; /** Test for {@link Levels}. */ diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java index eecb0f30..699eeb54 100644 --- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java +++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java @@ -32,11 +32,11 @@ import org.apache.flink.table.store.file.data.DataFileReader; import org.apache.flink.table.store.file.data.DataFileWriter; import org.apache.flink.table.store.file.format.FlushingFileFormat; import org.apache.flink.table.store.file.memory.HeapMemorySegmentPool; -import org.apache.flink.table.store.file.mergetree.compact.CompactManager; import org.apache.flink.table.store.file.mergetree.compact.CompactRewriter; import org.apache.flink.table.store.file.mergetree.compact.CompactStrategy; import org.apache.flink.table.store.file.mergetree.compact.DeduplicateMergeFunction; import org.apache.flink.table.store.file.mergetree.compact.IntervalPartition; +import org.apache.flink.table.store.file.mergetree.compact.MergeTreeCompactManager; import org.apache.flink.table.store.file.mergetree.compact.UniversalCompaction; import org.apache.flink.table.store.file.schema.SchemaManager; import org.apache.flink.table.store.file.utils.FileStorePathFactory; @@ -270,7 +270,7 @@ public class MergeTreeTest { new MergeTreeWriter( dataFileWriter.keyType(), dataFileWriter.valueType(), - createCompactManager(dataFileWriter, service), + createCompactManager(dataFileWriter, service, files), new Levels(comparator, files, options.numLevels()), maxSequenceNumber, comparator, @@ -284,17 +284,19 @@ public class MergeTreeTest { return writer; } - private CompactManager createCompactManager( - DataFileWriter dataFileWriter, ExecutorService compactExecutor) { - CompactStrategy compactStrategy = + private MergeTreeCompactManager createCompactManager( + DataFileWriter dataFileWriter, + ExecutorService compactExecutor, + List<DataFileMeta> files) { + CompactStrategy strategy = new UniversalCompaction( options.maxSizeAmplificationPercent(), - options.sizeRatio(), + options.sortedRunSizeRatio(), options.numSortedRunCompactionTrigger()); CompactRewriter rewriter = (outputLevel, dropDelete, sections) -> dataFileWriter.write( - new RecordReaderIterator<KeyValue>( + new RecordReaderIterator<>( new MergeTreeReader( sections, dropDelete, @@ -302,8 +304,13 @@ public class MergeTreeTest { comparator, new DeduplicateMergeFunction())), outputLevel); - return new CompactManager( - compactExecutor, compactStrategy, comparator, options.targetFileSize(), rewriter); + return new MergeTreeCompactManager( + compactExecutor, + new Levels(comparator, files, options.numLevels()), + strategy, + comparator, + options.targetFileSize(), + rewriter); } private void mergeCompacted( diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/CompactManagerTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/MergeTreeCompactManagerTest.java similarity index 96% rename from flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/CompactManagerTest.java rename to flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/MergeTreeCompactManagerTest.java index de636f63..64c46c81 100644 --- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/CompactManagerTest.java +++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/MergeTreeCompactManagerTest.java @@ -20,6 +20,7 @@ package org.apache.flink.table.store.file.mergetree.compact; import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.binary.BinaryRowData; +import org.apache.flink.table.store.file.compact.CompactUnit; import org.apache.flink.table.store.file.data.DataFileMeta; import org.apache.flink.table.store.file.data.DataFileTestUtils; import org.apache.flink.table.store.file.mergetree.Levels; @@ -44,8 +45,8 @@ import java.util.stream.Collectors; import static org.apache.flink.table.store.file.data.DataFileTestUtils.newFile; import static org.assertj.core.api.Assertions.assertThat; -/** Test for {@link CompactManager}. */ -public class CompactManagerTest { +/** Test for {@link MergeTreeCompactManager}. */ +public class MergeTreeCompactManagerTest { private final Comparator<RowData> comparator = Comparator.comparingInt(o -> o.getInt(0)); @@ -194,11 +195,11 @@ public class CompactManagerTest { files.add(minMax.toFile(i)); } Levels levels = new Levels(comparator, files, 3); - CompactManager manager = - new CompactManager( - service, strategy, comparator, 2, testRewriter(expectedDropDelete)); - manager.submitCompaction(levels); - manager.finishCompaction(levels, true); + MergeTreeCompactManager manager = + new MergeTreeCompactManager( + service, levels, strategy, comparator, 2, testRewriter(expectedDropDelete)); + manager.submitCompaction(); + manager.finishCompaction(true); List<LevelMinMax> outputs = levels.allFiles().stream().map(LevelMinMax::new).collect(Collectors.toList()); assertThat(outputs).isEqualTo(expected); diff --git a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompactionTest.java b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompactionTest.java index 7dcb6f61..a42ede9c 100644 --- a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompactionTest.java +++ b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompactionTest.java @@ -18,6 +18,7 @@ package org.apache.flink.table.store.file.mergetree.compact; +import org.apache.flink.table.store.file.compact.CompactUnit; import org.apache.flink.table.store.file.data.DataFileMeta; import org.apache.flink.table.store.file.mergetree.LevelSortedRun; import org.apache.flink.table.store.file.mergetree.SortedRun; diff --git a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogTestUtils.java b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogTestUtils.java index 8c82a433..70b45314 100644 --- a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogTestUtils.java +++ b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogTestUtils.java @@ -58,7 +58,7 @@ import java.util.stream.IntStream; import static org.apache.flink.table.data.binary.BinaryRowDataUtil.EMPTY_ROW; import static org.apache.flink.table.store.CoreOptions.LOG_CHANGELOG_MODE; import static org.apache.flink.table.store.CoreOptions.LOG_CONSISTENCY; -import static org.apache.flink.table.store.file.mergetree.compact.CompactManagerTest.row; +import static org.apache.flink.table.store.file.mergetree.compact.MergeTreeCompactManagerTest.row; import static org.apache.flink.table.store.kafka.KafkaLogOptions.BOOTSTRAP_SERVERS; import static org.apache.flink.table.store.kafka.KafkaLogOptions.TOPIC;