This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 9f760307 [FLINK-27708] Add background compaction task for append-only 
table when ingesting
9f760307 is described below

commit 9f760307f9da361ddb4ee786a9f3ab1651db149c
Author: Jane Chan <55568005+ladyfor...@users.noreply.github.com>
AuthorDate: Mon Jul 11 11:53:55 2022 +0800

    [FLINK-27708] Add background compaction task for append-only table when 
ingesting
    
    This closes #182
---
 .../shortcodes/generated/core_configuration.html   |  18 +-
 ...lyTableTest.java => AppendOnlyTableITCase.java} |  90 +++++++--
 .../connector/sink/CommittableSerializerTest.java  |   2 +-
 .../sink/FileCommittableSerializerTest.java        |   2 +-
 .../source/FileStoreSourceReaderTest.java          |   2 +-
 .../source/FileStoreSourceSplitGeneratorTest.java  |   2 +-
 .../source/FileStoreSourceSplitReaderTest.java     |   2 +-
 .../source/FileStoreSourceSplitSerializerTest.java |   2 +-
 .../source/FileStoreSourceSplitStateTest.java      |   2 +-
 .../PendingSplitsCheckpointSerializerTest.java     |   2 +-
 .../source/StaticFileStoreSplitEnumeratorTest.java |   2 +-
 .../source/TestChangelogDataReadWrite.java         |   5 +-
 .../org/apache/flink/table/store/CoreOptions.java  |  33 +++-
 .../table/store/file/AppendOnlyFileStore.java      |   6 +-
 .../flink/table/store/file/KeyValueFileStore.java  |   1 -
 .../table/store/file/compact/CompactManager.java   |  55 ++++++
 .../{mergetree => }/compact/CompactResult.java     |   2 +-
 .../table/store/file/compact/CompactTask.java      |  75 +++++++
 .../file/{mergetree => }/compact/CompactUnit.java  |   2 +-
 .../store/file/data/AppendOnlyCompactManager.java  | 131 +++++++++++++
 .../table/store/file/data/AppendOnlyWriter.java    | 188 ++++++++++++++----
 .../store/file/mergetree/MergeTreeWriter.java      |   8 +-
 .../file/mergetree/compact/CompactStrategy.java    |   1 +
 ...ctManager.java => MergeTreeCompactManager.java} |  50 ++---
 ...{CompactTask.java => MergeTreeCompactTask.java} | 116 ++++-------
 .../mergetree/compact/UniversalCompaction.java     |   1 +
 .../file/operation/AbstractFileStoreScan.java      |  12 +-
 .../file/operation/AppendOnlyFileStoreWrite.java   |  69 ++++++-
 .../table/store/file/operation/FileStoreWrite.java |   2 +-
 .../file/operation/KeyValueFileStoreWrite.java     |  37 ++--
 .../flink/table/store/table/sink/TableCompact.java |   2 +-
 .../file/data/AppendOnlyCompactManagerTest.java    | 216 +++++++++++++++++++++
 .../store/file/data/AppendOnlyWriterTest.java      | 158 ++++++++++++---
 .../table/store/file/data/DataFileTestUtils.java   |  15 ++
 .../store/file/format/FileFormatSuffixTest.java    |  13 +-
 .../ManifestCommittableSerializerTest.java         |   2 +-
 .../table/store/file/mergetree/LevelsTest.java     |   2 +-
 .../table/store/file/mergetree/MergeTreeTest.java  |  25 ++-
 ...rTest.java => MergeTreeCompactManagerTest.java} |  15 +-
 .../mergetree/compact/UniversalCompactionTest.java |   1 +
 .../flink/table/store/kafka/KafkaLogTestUtils.java |   2 +-
 41 files changed, 1131 insertions(+), 240 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index 304e7585..09cdf87e 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -32,17 +32,29 @@
             <td>Boolean</td>
             <td>Whether to force a compaction before commit.</td>
         </tr>
+        <tr>
+            <td><h5>compaction.early-max.file-num</h5></td>
+            <td style="word-wrap: break-word;">50</td>
+            <td>Integer</td>
+            <td>For file set [f_0,...,f_N], the maximum file number to trigger 
a compaction for append-only table, even if sum(size(f_i)) &lt; targetFileSize. 
This value avoids pending too much small files, which slows down the 
performance.</td>
+        </tr>
         <tr>
             <td><h5>compaction.max-size-amplification-percent</h5></td>
             <td style="word-wrap: break-word;">200</td>
             <td>Integer</td>
-            <td>The size amplification is defined as the amount (in 
percentage) of additional storage needed to store a single byte of data in the 
merge tree.</td>
+            <td>The size amplification is defined as the amount (in 
percentage) of additional storage needed to store a single byte of data in the 
merge tree for changelog mode table.</td>
+        </tr>
+        <tr>
+            <td><h5>compaction.min.file-num</h5></td>
+            <td style="word-wrap: break-word;">5</td>
+            <td>Integer</td>
+            <td>For file set [f_0,...,f_N], the minimum file number which 
satisfies sum(size(f_i)) &gt;= targetFileSize to trigger a compaction for 
append-only table. This value avoids almost-full-file to be compacted, which is 
not cost-effective.</td>
         </tr>
         <tr>
             <td><h5>compaction.size-ratio</h5></td>
             <td style="word-wrap: break-word;">1</td>
             <td>Integer</td>
-            <td>Percentage flexibility while comparing sorted run size. If the 
candidate sorted run(s) size is 1% smaller than the next sorted run's size, 
then include next sorted run into this candidate set.</td>
+            <td>Percentage flexibility while comparing sorted run size for 
changelog mode table. If the candidate sorted run(s) size is 1% smaller than 
the next sorted run's size, then include next sorted run into this candidate 
set.</td>
         </tr>
         <tr>
             <td><h5>continuous.discovery-interval</h5></td>
@@ -196,7 +208,7 @@
         </tr>
         <tr>
             <td><h5>write-buffer-size</h5></td>
-            <td style="word-wrap: break-word;">256 mb</td>
+            <td style="word-wrap: break-word;">128 mb</td>
             <td>MemorySize</td>
             <td>Amount of data to build up in memory before converting to a 
sorted on-disk file.</td>
         </tr>
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AppendOnlyTableTest.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AppendOnlyTableITCase.java
similarity index 67%
rename from 
flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AppendOnlyTableTest.java
rename to 
flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AppendOnlyTableITCase.java
index 682c623c..70f06993 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AppendOnlyTableTest.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/AppendOnlyTableITCase.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.store.connector;
 
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.planner.factories.TestValuesTableFactory;
+import org.apache.flink.table.store.file.Snapshot;
 import org.apache.flink.types.Row;
 import org.apache.flink.types.RowKind;
 
@@ -33,7 +34,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Test case for append-only managed table. */
-public class AppendOnlyTableTest extends FileStoreTableITCase {
+public class AppendOnlyTableITCase extends FileStoreTableITCase {
 
     @Test
     public void testCreateTableWithPrimaryKey() {
@@ -124,17 +125,61 @@ public class AppendOnlyTableTest extends 
FileStoreTableITCase {
                         Row.of("AAA"), Row.of("AAA"), Row.of("BBB"), 
Row.of("AAA"));
     }
 
-    private void testRejectChanges(RowKind kind) {
-        List<Row> input = Collections.singletonList(Row.ofKind(kind, 1, 
"AAA"));
-
-        String id = TestValuesTableFactory.registerData(input);
-        batchSql(
-                "CREATE TABLE source (id INT, data STRING) WITH 
('connector'='values', 'bounded'='true', 'data-id'='%s')",
-                id);
+    @Test
+    public void testAutoCompaction() {
+        batchSql("ALTER TABLE append_table SET ('commit.force-compact' = 
'true')");
+        batchSql("ALTER TABLE append_table SET ('compaction.min.file-num' = 
'2')");
+        batchSql("ALTER TABLE append_table SET 
('compaction.early-max.file-num' = '4')");
+
+        assertAutoCompaction(
+                "INSERT INTO append_table VALUES (1, 'AAA'), (2, 'BBB')",
+                1L,
+                Snapshot.CommitKind.APPEND);
+        assertAutoCompaction(
+                "INSERT INTO append_table VALUES (3, 'CCC'), (4, 'DDD')",
+                2L,
+                Snapshot.CommitKind.APPEND);
+        assertAutoCompaction(
+                "INSERT INTO append_table VALUES (1, 'AAA'), (2, 'BBB'), (3, 
'CCC'), (4, 'DDD')",
+                3L,
+                Snapshot.CommitKind.APPEND);
+        assertAutoCompaction(
+                "INSERT INTO append_table VALUES (5, 'EEE'), (6, 'FFF')",
+                5L,
+                Snapshot.CommitKind.COMPACT);
+        assertAutoCompaction(
+                "INSERT INTO append_table VALUES (7, 'HHH'), (8, 'III')",
+                6L,
+                Snapshot.CommitKind.APPEND);
+        assertAutoCompaction(
+                "INSERT INTO append_table VALUES (9, 'JJJ'), (10, 'KKK')",
+                7L,
+                Snapshot.CommitKind.APPEND);
+        assertAutoCompaction(
+                "INSERT INTO append_table VALUES (11, 'LLL'), (12, 'MMM')",
+                9L,
+                Snapshot.CommitKind.COMPACT);
 
-        assertThatThrownBy(() -> batchSql("INSERT INTO append_table SELECT * 
FROM source"))
-                .hasRootCauseInstanceOf(IllegalStateException.class)
-                .hasRootCauseMessage("Append only writer can not accept row 
with RowKind %s", kind);
+        List<Row> rows = batchSql("SELECT * FROM append_table");
+        assertThat(rows.size()).isEqualTo(16);
+        assertThat(rows)
+                .containsExactlyInAnyOrder(
+                        Row.of(1, "AAA"),
+                        Row.of(2, "BBB"),
+                        Row.of(3, "CCC"),
+                        Row.of(4, "DDD"),
+                        Row.of(1, "AAA"),
+                        Row.of(2, "BBB"),
+                        Row.of(3, "CCC"),
+                        Row.of(4, "DDD"),
+                        Row.of(5, "EEE"),
+                        Row.of(6, "FFF"),
+                        Row.of(7, "HHH"),
+                        Row.of(8, "III"),
+                        Row.of(9, "JJJ"),
+                        Row.of(10, "KKK"),
+                        Row.of(11, "LLL"),
+                        Row.of(12, "MMM"));
     }
 
     @Test
@@ -155,6 +200,27 @@ public class AppendOnlyTableTest extends 
FileStoreTableITCase {
     @Override
     protected List<String> ddl() {
         return Collections.singletonList(
-                "CREATE TABLE IF NOT EXISTS append_table (id INT, data STRING) 
WITH ('write-mode'='append-only')");
+                "CREATE TABLE IF NOT EXISTS append_table (id INT, data STRING) 
WITH ('write-mode'='append-only', 'commit.force-compact' = 'true')");
+    }
+
+    private void testRejectChanges(RowKind kind) {
+        List<Row> input = Collections.singletonList(Row.ofKind(kind, 1, 
"AAA"));
+
+        String id = TestValuesTableFactory.registerData(input);
+        batchSql(
+                "CREATE TABLE source (id INT, data STRING) WITH 
('connector'='values', 'bounded'='true', 'data-id'='%s')",
+                id);
+
+        assertThatThrownBy(() -> batchSql("INSERT INTO append_table SELECT * 
FROM source"))
+                .hasRootCauseInstanceOf(IllegalStateException.class)
+                .hasRootCauseMessage("Append only writer can not accept row 
with RowKind %s", kind);
+    }
+
+    private void assertAutoCompaction(
+            String sql, long expectedSnapshotId, Snapshot.CommitKind 
expectedCommitKind) {
+        batchSql(sql);
+        Snapshot snapshot = findLatestSnapshot("append_table", true);
+        assertThat(snapshot.id()).isEqualTo(expectedSnapshotId);
+        assertThat(snapshot.commitKind()).isEqualTo(expectedCommitKind);
     }
 }
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/CommittableSerializerTest.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/CommittableSerializerTest.java
index 03b5e614..7752421c 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/CommittableSerializerTest.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/CommittableSerializerTest.java
@@ -26,7 +26,7 @@ import org.junit.jupiter.api.Test;
 import java.io.IOException;
 
 import static 
org.apache.flink.table.store.file.manifest.ManifestCommittableSerializerTest.randomIncrement;
-import static 
org.apache.flink.table.store.file.mergetree.compact.CompactManagerTest.row;
+import static 
org.apache.flink.table.store.file.mergetree.compact.MergeTreeCompactManagerTest.row;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test for {@link CommittableSerializer}. */
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/FileCommittableSerializerTest.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/FileCommittableSerializerTest.java
index 73b66f03..0b010cf4 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/FileCommittableSerializerTest.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/sink/FileCommittableSerializerTest.java
@@ -26,7 +26,7 @@ import org.junit.jupiter.api.Test;
 import java.io.IOException;
 
 import static 
org.apache.flink.table.store.file.manifest.ManifestCommittableSerializerTest.randomIncrement;
-import static 
org.apache.flink.table.store.file.mergetree.compact.CompactManagerTest.row;
+import static 
org.apache.flink.table.store.file.mergetree.compact.MergeTreeCompactManagerTest.row;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test for {@link FileCommittableSerializer}. */
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceReaderTest.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceReaderTest.java
index 626bdc38..6025b735 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceReaderTest.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceReaderTest.java
@@ -26,7 +26,7 @@ import org.junit.jupiter.api.io.TempDir;
 import java.util.Collections;
 
 import static 
org.apache.flink.table.store.connector.source.FileStoreSourceSplitSerializerTest.newSourceSplit;
-import static 
org.apache.flink.table.store.file.mergetree.compact.CompactManagerTest.row;
+import static 
org.apache.flink.table.store.file.mergetree.compact.MergeTreeCompactManagerTest.row;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Unit tests for the {@link FileStoreSourceReader}. */
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGeneratorTest.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGeneratorTest.java
index d25e9119..aece5b87 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGeneratorTest.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitGeneratorTest.java
@@ -37,7 +37,7 @@ import java.util.Comparator;
 import java.util.List;
 import java.util.stream.Collectors;
 
-import static 
org.apache.flink.table.store.file.mergetree.compact.CompactManagerTest.row;
+import static 
org.apache.flink.table.store.file.mergetree.compact.MergeTreeCompactManagerTest.row;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test for {@link FileStoreSourceSplitGenerator}. */
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReaderTest.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReaderTest.java
index 258f8da6..214fbca8 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReaderTest.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitReaderTest.java
@@ -45,7 +45,7 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import static 
org.apache.flink.table.store.connector.source.FileStoreSourceSplitSerializerTest.newSourceSplit;
-import static 
org.apache.flink.table.store.file.mergetree.compact.CompactManagerTest.row;
+import static 
org.apache.flink.table.store.file.mergetree.compact.MergeTreeCompactManagerTest.row;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitSerializerTest.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitSerializerTest.java
index 90c595e4..863a2047 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitSerializerTest.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitSerializerTest.java
@@ -30,7 +30,7 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.List;
 
-import static 
org.apache.flink.table.store.file.mergetree.compact.CompactManagerTest.row;
+import static 
org.apache.flink.table.store.file.mergetree.compact.MergeTreeCompactManagerTest.row;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Unit tests for the {@link FileStoreSourceSplitSerializer}. */
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitStateTest.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitStateTest.java
index e6c72d63..539486b2 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitStateTest.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/FileStoreSourceSplitStateTest.java
@@ -26,7 +26,7 @@ import java.util.Arrays;
 
 import static 
org.apache.flink.table.store.connector.source.FileStoreSourceSplitSerializerTest.newFile;
 import static 
org.apache.flink.table.store.connector.source.FileStoreSourceSplitSerializerTest.newSourceSplit;
-import static 
org.apache.flink.table.store.file.mergetree.compact.CompactManagerTest.row;
+import static 
org.apache.flink.table.store.file.mergetree.compact.MergeTreeCompactManagerTest.row;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Unit tests for the {@link FileStoreSourceSplitState}. */
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/PendingSplitsCheckpointSerializerTest.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/PendingSplitsCheckpointSerializerTest.java
index a3ab6a2f..0a5a3862 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/PendingSplitsCheckpointSerializerTest.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/PendingSplitsCheckpointSerializerTest.java
@@ -28,7 +28,7 @@ import java.util.Collections;
 
 import static 
org.apache.flink.table.store.connector.source.FileStoreSourceSplitSerializerTest.newFile;
 import static 
org.apache.flink.table.store.connector.source.FileStoreSourceSplitSerializerTest.newSourceSplit;
-import static 
org.apache.flink.table.store.file.mergetree.compact.CompactManagerTest.row;
+import static 
org.apache.flink.table.store.file.mergetree.compact.MergeTreeCompactManagerTest.row;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Unit tests for the {@link PendingSplitsCheckpointSerializer}. */
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/StaticFileStoreSplitEnumeratorTest.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/StaticFileStoreSplitEnumeratorTest.java
index 6a60b140..8b28c155 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/StaticFileStoreSplitEnumeratorTest.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/StaticFileStoreSplitEnumeratorTest.java
@@ -27,7 +27,7 @@ import java.util.Arrays;
 
 import static 
org.apache.flink.table.store.connector.source.FileStoreSourceSplitSerializerTest.newFile;
 import static 
org.apache.flink.table.store.connector.source.FileStoreSourceSplitSerializerTest.newSourceSplit;
-import static 
org.apache.flink.table.store.file.mergetree.compact.CompactManagerTest.row;
+import static 
org.apache.flink.table.store.file.mergetree.compact.MergeTreeCompactManagerTest.row;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Unit tests for the {@link StaticFileStoreSplitEnumerator}. */
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java
index 16134667..94a02856 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/source/TestChangelogDataReadWrite.java
@@ -49,6 +49,7 @@ import org.apache.flink.types.RowKind;
 import org.apache.flink.util.Preconditions;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
@@ -139,7 +140,8 @@ public class TestChangelogDataReadWrite {
     }
 
     public RecordWriter<KeyValue> createMergeTreeWriter(BinaryRowData 
partition, int bucket) {
-        CoreOptions options = new CoreOptions(new Configuration());
+        CoreOptions options =
+                new 
CoreOptions(Collections.singletonMap(CoreOptions.FILE_FORMAT.key(), "avro"));
         RecordWriter<KeyValue> writer =
                 new KeyValueFileStoreWrite(
                                 new SchemaManager(tablePath),
@@ -148,7 +150,6 @@ public class TestChangelogDataReadWrite {
                                 VALUE_TYPE,
                                 () -> COMPARATOR,
                                 new DeduplicateMergeFunction(),
-                                avro,
                                 pathFactory,
                                 snapshotManager,
                                 null, // not used, we only create an empty 
writer
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
index a45b5b6f..3cc3ff27 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
@@ -214,17 +214,36 @@ public class CoreOptions implements Serializable {
                     .defaultValue(200)
                     .withDescription(
                             "The size amplification is defined as the amount 
(in percentage) of additional storage "
-                                    + "needed to store a single byte of data 
in the merge tree.");
+                                    + "needed to store a single byte of data 
in the merge tree for changelog mode table.");
 
     public static final ConfigOption<Integer> COMPACTION_SIZE_RATIO =
             ConfigOptions.key("compaction.size-ratio")
                     .intType()
                     .defaultValue(1)
                     .withDescription(
-                            "Percentage flexibility while comparing sorted run 
size. If the candidate sorted run(s) "
+                            "Percentage flexibility while comparing sorted run 
size for changelog mode table. If the candidate sorted run(s) "
                                     + "size is 1% smaller than the next sorted 
run's size, then include next sorted run "
                                     + "into this candidate set.");
 
+    public static final ConfigOption<Integer> COMPACTION_MIN_FILE_NUM =
+            ConfigOptions.key("compaction.min.file-num")
+                    .intType()
+                    .defaultValue(5)
+                    .withDescription(
+                            "For file set [f_0,...,f_N], the minimum file 
number which satisfies "
+                                    + "sum(size(f_i)) >= targetFileSize to 
trigger a compaction for "
+                                    + "append-only table. This value avoids 
almost-full-file to be compacted, "
+                                    + "which is not cost-effective.");
+
+    public static final ConfigOption<Integer> COMPACTION_MAX_FILE_NUM =
+            ConfigOptions.key("compaction.early-max.file-num")
+                    .intType()
+                    .defaultValue(50)
+                    .withDescription(
+                            "For file set [f_0,...,f_N], the maximum file 
number to trigger a compaction "
+                                    + "for append-only table, even if 
sum(size(f_i)) < targetFileSize. This value "
+                                    + "avoids pending too much small files, 
which slows down the performance.");
+
     public static final ConfigOption<Boolean> CHANGELOG_FILE =
             ConfigOptions.key("changelog-file")
                     .booleanType()
@@ -398,10 +417,18 @@ public class CoreOptions implements Serializable {
         return options.get(COMPACTION_MAX_SIZE_AMPLIFICATION_PERCENT);
     }
 
-    public int sizeRatio() {
+    public int sortedRunSizeRatio() {
         return options.get(COMPACTION_SIZE_RATIO);
     }
 
+    public int minFileNum() {
+        return options.get(COMPACTION_MIN_FILE_NUM);
+    }
+
+    public int maxFileNum() {
+        return options.get(COMPACTION_MAX_FILE_NUM);
+    }
+
     public boolean enableChangelogFile() {
         return options.get(CHANGELOG_FILE);
     }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AppendOnlyFileStore.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AppendOnlyFileStore.java
index 8a803a74..12ca83cd 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AppendOnlyFileStore.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AppendOnlyFileStore.java
@@ -58,13 +58,17 @@ public class AppendOnlyFileStore extends 
AbstractFileStore<RowData> {
     @Override
     public AppendOnlyFileStoreWrite newWrite() {
         return new AppendOnlyFileStoreWrite(
+                newRead(),
                 schemaId,
                 rowType,
                 options.fileFormat(),
                 pathFactory(),
                 snapshotManager(),
                 newScan(true),
-                options.targetFileSize());
+                options.targetFileSize(),
+                options.minFileNum(),
+                options.maxFileNum(),
+                options.commitForceCompact());
     }
 
     private AppendOnlyFileStoreScan newScan(boolean checkNumOfBuckets) {
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValueFileStore.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValueFileStore.java
index 9c7ae86e..77417e99 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValueFileStore.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValueFileStore.java
@@ -86,7 +86,6 @@ public class KeyValueFileStore extends 
AbstractFileStore<KeyValue> {
                 valueType,
                 keyComparatorSupplier,
                 mergeFunction,
-                options.fileFormat(),
                 pathFactory(),
                 snapshotManager(),
                 newScan(true),
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/CompactManager.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/CompactManager.java
new file mode 100644
index 00000000..2063e17b
--- /dev/null
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/CompactManager.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.compact;
+
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
+/** Manager to submit compaction task. */
+public abstract class CompactManager {
+
+    protected final ExecutorService executor;
+
+    protected Future<CompactResult> taskFuture;
+
+    public CompactManager(ExecutorService executor) {
+        this.executor = executor;
+    }
+
+    /** Submit a new compaction task. */
+    public abstract void submitCompaction();
+
+    public boolean isCompactionFinished() {
+        return taskFuture == null;
+    }
+
+    public Optional<CompactResult> finishCompaction(boolean blocking)
+            throws ExecutionException, InterruptedException {
+        if (taskFuture != null) {
+            if (blocking || taskFuture.isDone()) {
+                CompactResult result = taskFuture.get();
+                taskFuture = null;
+                return Optional.of(result);
+            }
+        }
+        return Optional.empty();
+    }
+}
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactResult.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/CompactResult.java
similarity index 94%
rename from 
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactResult.java
rename to 
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/CompactResult.java
index e5b6e826..865ac251 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactResult.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/CompactResult.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.store.file.mergetree.compact;
+package org.apache.flink.table.store.file.compact;
 
 import org.apache.flink.table.store.file.data.DataFileMeta;
 
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/CompactTask.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/CompactTask.java
new file mode 100644
index 00000000..a129c28a
--- /dev/null
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/CompactTask.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.compact;
+
+import org.apache.flink.table.store.file.data.DataFileMeta;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.concurrent.Callable;
+
+/** Compact task. */
+public abstract class CompactTask implements Callable<CompactResult> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(CompactTask.class);
+
+    private final List<DataFileMeta> inputs;
+
+    public CompactTask(List<DataFileMeta> inputs) {
+        this.inputs = inputs;
+    }
+
+    @Override
+    public CompactResult call() throws Exception {
+        long startMillis = System.currentTimeMillis();
+        CompactResult result = doCompact(inputs);
+
+        if (LOG.isDebugEnabled()) {
+            logMetric(startMillis, result.before(), result.after());
+        }
+
+        return result;
+    }
+
+    protected String logMetric(
+            long startMillis, List<DataFileMeta> compactBefore, 
List<DataFileMeta> compactAfter) {
+        return String.format(
+                "Done compacting %d files to %d files in %dms. "
+                        + "Rewrite input file size = %d, output file size = 
%d",
+                compactBefore.size(),
+                compactAfter.size(),
+                System.currentTimeMillis() - startMillis,
+                collectRewriteSize(compactBefore),
+                collectRewriteSize(compactAfter));
+    }
+
+    /**
+     * Perform compaction.
+     *
+     * @param inputs the candidate files to be compacted
+     * @return {@link CompactResult} of compact before and compact after files.
+     */
+    protected abstract CompactResult doCompact(List<DataFileMeta> inputs) 
throws Exception;
+
+    private long collectRewriteSize(List<DataFileMeta> files) {
+        return files.stream().mapToLong(DataFileMeta::fileSize).sum();
+    }
+}
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactUnit.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/CompactUnit.java
similarity index 96%
rename from 
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactUnit.java
rename to 
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/CompactUnit.java
index af8510e1..f42b89d2 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactUnit.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/CompactUnit.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.store.file.mergetree.compact;
+package org.apache.flink.table.store.file.compact;
 
 import org.apache.flink.table.store.file.data.DataFileMeta;
 import org.apache.flink.table.store.file.mergetree.LevelSortedRun;
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyCompactManager.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyCompactManager.java
new file mode 100644
index 00000000..e30b0e43
--- /dev/null
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyCompactManager.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.data;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.table.store.file.compact.CompactManager;
+import org.apache.flink.table.store.file.compact.CompactResult;
+import org.apache.flink.table.store.file.compact.CompactTask;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ExecutorService;
+
+/** Compact manager for {@link 
org.apache.flink.table.store.file.AppendOnlyFileStore}. */
+public class AppendOnlyCompactManager extends CompactManager {
+
+    private final int minFileNum;
+    private final int maxFileNum;
+    private final long targetFileSize;
+    private final CompactRewriter rewriter;
+    private final LinkedList<DataFileMeta> toCompact;
+
+    public AppendOnlyCompactManager(
+            ExecutorService executor,
+            LinkedList<DataFileMeta> toCompact,
+            int minFileNum,
+            int maxFileNum,
+            long targetFileSize,
+            CompactRewriter rewriter) {
+        super(executor);
+        this.toCompact = toCompact;
+        this.maxFileNum = maxFileNum;
+        this.minFileNum = minFileNum;
+        this.targetFileSize = targetFileSize;
+        this.rewriter = rewriter;
+    }
+
+    @Override
+    public void submitCompaction() {
+        if (taskFuture != null) {
+            throw new IllegalStateException(
+                    "Please finish the previous compaction before submitting 
new one.");
+        }
+        pickCompactBefore()
+                .ifPresent(
+                        (inputs) ->
+                                taskFuture =
+                                        executor.submit(
+                                                new 
AppendOnlyCompactTask(inputs, rewriter)));
+    }
+
+    @VisibleForTesting
+    Optional<List<DataFileMeta>> pickCompactBefore() {
+        long totalFileSize = 0L;
+        int fileNum = 0;
+        LinkedList<DataFileMeta> candidates = new LinkedList<>();
+
+        while (!toCompact.isEmpty()) {
+            DataFileMeta file = toCompact.pollFirst();
+            candidates.add(file);
+            totalFileSize += file.fileSize();
+            fileNum++;
+            if ((totalFileSize >= targetFileSize && fileNum >= minFileNum)
+                    || fileNum >= maxFileNum) {
+                return Optional.of(candidates);
+            } else if (totalFileSize >= targetFileSize) {
+                // left pointer shift one pos to right
+                DataFileMeta removed = candidates.pollFirst();
+                assert removed != null;
+                totalFileSize -= removed.fileSize();
+                fileNum--;
+            }
+        }
+        toCompact.addAll(candidates);
+        return Optional.empty();
+    }
+
+    @VisibleForTesting
+    LinkedList<DataFileMeta> getToCompact() {
+        return toCompact;
+    }
+
+    /** A {@link CompactTask} impl for append-only table. */
+    public static class AppendOnlyCompactTask extends CompactTask {
+
+        private final CompactRewriter rewriter;
+
+        public AppendOnlyCompactTask(List<DataFileMeta> toCompact, 
CompactRewriter rewriter) {
+            super(toCompact);
+            this.rewriter = rewriter;
+        }
+
+        @Override
+        protected CompactResult doCompact(List<DataFileMeta> inputs) throws 
Exception {
+            List<DataFileMeta> compactAfter = rewriter.rewrite(inputs);
+            return new CompactResult() {
+                @Override
+                public List<DataFileMeta> before() {
+                    return inputs;
+                }
+
+                @Override
+                public List<DataFileMeta> after() {
+                    return compactAfter;
+                }
+            };
+        }
+    }
+
+    /** Compact rewriter for append-only table. */
+    public interface CompactRewriter {
+        List<DataFileMeta> rewrite(List<DataFileMeta> compactBefore) throws 
Exception;
+    }
+}
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyWriter.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyWriter.java
index 8a7105e7..305a7133 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyWriter.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/data/AppendOnlyWriter.java
@@ -19,6 +19,8 @@
 
 package org.apache.flink.table.store.file.data;
 
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.accumulators.LongCounter;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.store.file.mergetree.Increment;
@@ -34,26 +36,36 @@ import 
org.apache.flink.table.store.file.writer.RollingFileWriter;
 import org.apache.flink.table.store.format.FileFormat;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.types.RowKind;
+import org.apache.flink.util.CloseableIterator;
 import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.LinkedList;
 import java.util.List;
+import java.util.concurrent.ExecutionException;
 import java.util.function.Function;
 import java.util.function.Supplier;
 
+import static 
org.apache.flink.table.store.file.data.AppendOnlyWriter.RowRollingWriter.createRollingRowWriter;
+
 /**
  * A {@link RecordWriter} implementation that only accepts records which are 
always insert
  * operations and don't have any unique keys or sort keys.
  */
 public class AppendOnlyWriter implements RecordWriter<RowData> {
+
     private final long schemaId;
+    private final FileFormat fileFormat;
     private final long targetFileSize;
+    private final RowType writeSchema;
     private final DataFilePathFactory pathFactory;
-    private final FieldStatsArraySerializer statsArraySerializer;
-
-    private final FileWriter.Factory<RowData, Metric> fileWriterFactory;
-    private long nextSeqNum;
+    private final AppendOnlyCompactManager compactManager;
+    private final boolean forceCompact;
+    private final LinkedList<DataFileMeta> toCompact;
+    private final List<DataFileMeta> compactBefore;
+    private final List<DataFileMeta> compactAfter;
+    private final LongCounter seqNumCounter;
 
     private RowRollingWriter writer;
 
@@ -62,23 +74,29 @@ public class AppendOnlyWriter implements 
RecordWriter<RowData> {
             FileFormat fileFormat,
             long targetFileSize,
             RowType writeSchema,
-            long maxWroteSeqNumber,
+            LinkedList<DataFileMeta> restoredFiles,
+            AppendOnlyCompactManager compactManager,
+            boolean forceCompact,
             DataFilePathFactory pathFactory) {
         this.schemaId = schemaId;
+        this.fileFormat = fileFormat;
         this.targetFileSize = targetFileSize;
+        this.writeSchema = writeSchema;
         this.pathFactory = pathFactory;
-        this.statsArraySerializer = new FieldStatsArraySerializer(writeSchema);
-
-        // Initialize the file writer factory to write records and generic 
metric.
-        this.fileWriterFactory =
-                MetricFileWriter.createFactory(
-                        fileFormat.createWriterFactory(writeSchema),
-                        Function.identity(),
+        this.compactManager = compactManager;
+        this.forceCompact = forceCompact;
+        this.toCompact = restoredFiles;
+        this.compactBefore = new ArrayList<>();
+        this.compactAfter = new ArrayList<>();
+        this.seqNumCounter = new 
LongCounter(getMaxSequenceNumber(restoredFiles) + 1);
+        this.writer =
+                createRollingRowWriter(
+                        schemaId,
+                        fileFormat,
+                        targetFileSize,
                         writeSchema,
-                        
fileFormat.createStatsExtractor(writeSchema).orElse(null));
-
-        this.nextSeqNum = maxWroteSeqNumber + 1;
-        this.writer = createRollingRowWriter();
+                        pathFactory,
+                        seqNumCounter);
     }
 
     @Override
@@ -93,21 +111,32 @@ public class AppendOnlyWriter implements 
RecordWriter<RowData> {
     @Override
     public Increment prepareCommit() throws Exception {
         List<DataFileMeta> newFiles = new ArrayList<>();
-
         if (writer != null) {
             writer.close();
             newFiles.addAll(writer.result());
 
             // Reopen the writer to accept further records.
-            writer = createRollingRowWriter();
+            seqNumCounter.resetLocal();
+            seqNumCounter.add(getMaxSequenceNumber(newFiles) + 1);
+            writer =
+                    createRollingRowWriter(
+                            schemaId,
+                            fileFormat,
+                            targetFileSize,
+                            writeSchema,
+                            pathFactory,
+                            seqNumCounter);
         }
-
-        return Increment.forAppend(newFiles);
+        // add new generated files
+        toCompact.addAll(newFiles);
+        submitCompaction();
+        finishCompaction(forceCompact);
+        return drainIncrement(newFiles);
     }
 
     @Override
     public void sync() throws Exception {
-        // Do nothing here, as this writer don't introduce any async 
compaction thread currently.
+        finishCompaction(true);
     }
 
     @Override
@@ -126,31 +155,124 @@ public class AppendOnlyWriter implements 
RecordWriter<RowData> {
         return result;
     }
 
-    private RowRollingWriter createRollingRowWriter() {
-        return new RowRollingWriter(
-                () -> new RowFileWriter(fileWriterFactory, 
pathFactory.newPath()), targetFileSize);
+    private static long getMaxSequenceNumber(List<DataFileMeta> fileMetas) {
+        return fileMetas.stream()
+                .map(DataFileMeta::maxSequenceNumber)
+                .max(Long::compare)
+                .orElse(-1L);
+    }
+
+    private void submitCompaction() throws ExecutionException, 
InterruptedException {
+        finishCompaction(false);
+        if (compactManager.isCompactionFinished() && !toCompact.isEmpty()) {
+            compactManager.submitCompaction();
+        }
+    }
+
+    private void finishCompaction(boolean blocking)
+            throws ExecutionException, InterruptedException {
+        compactManager
+                .finishCompaction(blocking)
+                .ifPresent(
+                        result -> {
+                            compactBefore.addAll(result.before());
+                            compactAfter.addAll(result.after());
+                            if (!result.after().isEmpty()) {
+                                // if the last compacted file is still small,
+                                // add it back to the head
+                                DataFileMeta lastFile =
+                                        
result.after().get(result.after().size() - 1);
+                                if (lastFile.fileSize() < targetFileSize) {
+                                    toCompact.offerFirst(lastFile);
+                                }
+                            }
+                        });
+    }
+
+    private Increment drainIncrement(List<DataFileMeta> newFiles) {
+        Increment increment =
+                new Increment(
+                        newFiles, new ArrayList<>(compactBefore), new 
ArrayList<>(compactAfter));
+        compactBefore.clear();
+        compactAfter.clear();
+        return increment;
     }
 
-    private class RowRollingWriter extends RollingFileWriter<RowData, 
DataFileMeta> {
+    @VisibleForTesting
+    List<DataFileMeta> getToCompact() {
+        return toCompact;
+    }
+
+    /** Rolling file writer for append-only table. */
+    public static class RowRollingWriter extends RollingFileWriter<RowData, 
DataFileMeta> {
 
         public RowRollingWriter(Supplier<RowFileWriter> writerFactory, long 
targetFileSize) {
             super(writerFactory, targetFileSize);
         }
+
+        public static RowRollingWriter createRollingRowWriter(
+                long schemaId,
+                FileFormat fileFormat,
+                long targetFileSize,
+                RowType writeSchema,
+                DataFilePathFactory pathFactory,
+                LongCounter seqNumCounter) {
+            return new RowRollingWriter(
+                    () ->
+                            new RowFileWriter(
+                                    MetricFileWriter.createFactory(
+                                            
fileFormat.createWriterFactory(writeSchema),
+                                            Function.identity(),
+                                            writeSchema,
+                                            fileFormat
+                                                    
.createStatsExtractor(writeSchema)
+                                                    .orElse(null)),
+                                    pathFactory.newPath(),
+                                    writeSchema,
+                                    schemaId,
+                                    seqNumCounter),
+                    targetFileSize);
+        }
+
+        public List<DataFileMeta> write(CloseableIterator<RowData> iterator) 
throws Exception {
+            try {
+                super.write(iterator);
+                super.close();
+                return super.result();
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            } finally {
+                iterator.close();
+            }
+        }
     }
 
-    private class RowFileWriter extends BaseFileWriter<RowData, DataFileMeta> {
-        private final long minSeqNum;
+    /**
+     * A {@link BaseFileWriter} impl with a counter with an initial value to 
record each row's
+     * sequence number.
+     */
+    public static class RowFileWriter extends BaseFileWriter<RowData, 
DataFileMeta> {
 
-        public RowFileWriter(FileWriter.Factory<RowData, Metric> 
writerFactory, Path path) {
+        private final FieldStatsArraySerializer statsArraySerializer;
+        private final long schemaId;
+        private final LongCounter seqNumCounter;
+
+        public RowFileWriter(
+                FileWriter.Factory<RowData, Metric> writerFactory,
+                Path path,
+                RowType writeSchema,
+                long schemaId,
+                LongCounter seqNumCounter) {
             super(writerFactory, path);
-            this.minSeqNum = nextSeqNum;
+            this.statsArraySerializer = new 
FieldStatsArraySerializer(writeSchema);
+            this.schemaId = schemaId;
+            this.seqNumCounter = seqNumCounter;
         }
 
         @Override
         public void write(RowData row) throws IOException {
             super.write(row);
-
-            nextSeqNum += 1;
+            seqNumCounter.add(1L);
         }
 
         @Override
@@ -162,8 +284,8 @@ public class AppendOnlyWriter implements 
RecordWriter<RowData> {
                     FileUtils.getFileSize(path),
                     recordCount(),
                     stats,
-                    minSeqNum,
-                    Math.max(minSeqNum, nextSeqNum - 1),
+                    seqNumCounter.getLocalValue() - super.recordCount(),
+                    seqNumCounter.getLocalValue() - 1,
                     schemaId);
         }
     }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
index 9c5cd32d..50fea94f 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
@@ -22,11 +22,11 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.runtime.util.MemorySegmentPool;
 import org.apache.flink.table.store.file.KeyValue;
+import org.apache.flink.table.store.file.compact.CompactManager;
+import org.apache.flink.table.store.file.compact.CompactResult;
 import org.apache.flink.table.store.file.data.DataFileMeta;
 import org.apache.flink.table.store.file.data.DataFileWriter;
 import org.apache.flink.table.store.file.memory.MemoryOwner;
-import org.apache.flink.table.store.file.mergetree.compact.CompactManager;
-import org.apache.flink.table.store.file.mergetree.compact.CompactResult;
 import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
 import org.apache.flink.table.store.file.writer.RecordWriter;
 import org.apache.flink.table.types.logical.RowType;
@@ -228,12 +228,12 @@ public class MergeTreeWriter implements 
RecordWriter<KeyValue>, MemoryOwner {
     private void submitCompaction() throws Exception {
         finishCompaction(false);
         if (compactManager.isCompactionFinished()) {
-            compactManager.submitCompaction(levels);
+            compactManager.submitCompaction();
         }
     }
 
     private void finishCompaction(boolean blocking) throws Exception {
-        Optional<CompactResult> result = 
compactManager.finishCompaction(levels, blocking);
+        Optional<CompactResult> result = 
compactManager.finishCompaction(blocking);
         result.ifPresent(this::updateCompactResult);
     }
 
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactStrategy.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactStrategy.java
index a23dec01..50addb42 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactStrategy.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactStrategy.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.store.file.mergetree.compact;
 
+import org.apache.flink.table.store.file.compact.CompactUnit;
 import org.apache.flink.table.store.file.mergetree.LevelSortedRun;
 
 import java.util.List;
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactManager.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/MergeTreeCompactManager.java
similarity index 80%
rename from 
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactManager.java
rename to 
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/MergeTreeCompactManager.java
index 294e54fd..17c160b2 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactManager.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/MergeTreeCompactManager.java
@@ -19,6 +19,9 @@
 package org.apache.flink.table.store.file.mergetree.compact;
 
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.compact.CompactManager;
+import org.apache.flink.table.store.file.compact.CompactResult;
+import org.apache.flink.table.store.file.compact.CompactUnit;
 import org.apache.flink.table.store.file.mergetree.Levels;
 
 import org.slf4j.Logger;
@@ -28,15 +31,14 @@ import java.util.Comparator;
 import java.util.Optional;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
 import java.util.stream.Collectors;
 
-/** Manager to submit compaction task. */
-public class CompactManager {
+/** Compact manager for {@link 
org.apache.flink.table.store.file.KeyValueFileStore}. */
+public class MergeTreeCompactManager extends CompactManager {
 
-    private static final Logger LOG = 
LoggerFactory.getLogger(CompactManager.class);
+    private static final Logger LOG = 
LoggerFactory.getLogger(MergeTreeCompactManager.class);
 
-    private final ExecutorService executor;
+    private final Levels levels;
 
     private final CompactStrategy strategy;
 
@@ -46,27 +48,23 @@ public class CompactManager {
 
     private final CompactRewriter rewriter;
 
-    private Future<CompactResult> taskFuture;
-
-    public CompactManager(
+    public MergeTreeCompactManager(
             ExecutorService executor,
+            Levels levels,
             CompactStrategy strategy,
             Comparator<RowData> keyComparator,
             long minFileSize,
             CompactRewriter rewriter) {
-        this.executor = executor;
+        super(executor);
+        this.levels = levels;
+        this.strategy = strategy;
         this.minFileSize = minFileSize;
         this.keyComparator = keyComparator;
-        this.strategy = strategy;
         this.rewriter = rewriter;
     }
 
-    public boolean isCompactionFinished() {
-        return taskFuture == null;
-    }
-
-    /** Submit a new compaction task. */
-    public void submitCompaction(Levels levels) {
+    @Override
+    public void submitCompaction() {
         if (taskFuture != null) {
             throw new IllegalStateException(
                     "Please finish the previous compaction before submitting 
new one.");
@@ -107,7 +105,8 @@ public class CompactManager {
     }
 
     private void submitCompaction(CompactUnit unit, boolean dropDelete) {
-        CompactTask task = new CompactTask(keyComparator, minFileSize, 
rewriter, unit, dropDelete);
+        MergeTreeCompactTask task =
+                new MergeTreeCompactTask(keyComparator, minFileSize, rewriter, 
unit, dropDelete);
         if (LOG.isDebugEnabled()) {
             LOG.debug(
                     "Pick these files (name, level, size) for compaction: {}",
@@ -123,22 +122,11 @@ public class CompactManager {
     }
 
     /** Finish current task, and update result files to {@link Levels}. */
-    public Optional<CompactResult> finishCompaction(Levels levels, boolean 
blocking)
+    @Override
+    public Optional<CompactResult> finishCompaction(boolean blocking)
             throws ExecutionException, InterruptedException {
-        Optional<CompactResult> result = finishCompaction(blocking);
+        Optional<CompactResult> result = super.finishCompaction(blocking);
         result.ifPresent(r -> levels.update(r.before(), r.after()));
         return result;
     }
-
-    private Optional<CompactResult> finishCompaction(boolean blocking)
-            throws ExecutionException, InterruptedException {
-        if (taskFuture != null) {
-            if (blocking || taskFuture.isDone()) {
-                CompactResult result = taskFuture.get();
-                taskFuture = null;
-                return Optional.of(result);
-            }
-        }
-        return Optional.empty();
-    }
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactTask.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/MergeTreeCompactTask.java
similarity index 59%
rename from 
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactTask.java
rename to 
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/MergeTreeCompactTask.java
index cac1000b..df0becb1 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactTask.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/MergeTreeCompactTask.java
@@ -19,23 +19,20 @@
 package org.apache.flink.table.store.file.mergetree.compact;
 
 import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.store.file.compact.CompactResult;
+import org.apache.flink.table.store.file.compact.CompactTask;
+import org.apache.flink.table.store.file.compact.CompactUnit;
 import org.apache.flink.table.store.file.data.DataFileMeta;
 import org.apache.flink.table.store.file.mergetree.SortedRun;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.List;
-import java.util.concurrent.Callable;
 
 import static java.util.Collections.singletonList;
 
-/** Compaction task. */
-public class CompactTask implements Callable<CompactResult> {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(CompactTask.class);
+/** Compact task for merge tree compaction. */
+public class MergeTreeCompactTask extends CompactTask {
 
     private final long minFileSize;
     private final CompactRewriter rewriter;
@@ -45,41 +42,30 @@ public class CompactTask implements Callable<CompactResult> 
{
 
     private final boolean dropDelete;
 
-    // metrics
-    private long rewriteInputSize;
-    private long rewriteOutputSize;
-    private int rewriteFilesNum;
+    // metric
     private int upgradeFilesNum;
 
-    public CompactTask(
+    public MergeTreeCompactTask(
             Comparator<RowData> keyComparator,
             long minFileSize,
             CompactRewriter rewriter,
             CompactUnit unit,
             boolean dropDelete) {
+        super(unit.files());
         this.minFileSize = minFileSize;
         this.rewriter = rewriter;
         this.outputLevel = unit.outputLevel();
         this.partitioned = new IntervalPartition(unit.files(), 
keyComparator).partition();
         this.dropDelete = dropDelete;
 
-        this.rewriteInputSize = 0;
-        this.rewriteOutputSize = 0;
-        this.rewriteFilesNum = 0;
         this.upgradeFilesNum = 0;
     }
 
     @Override
-    public CompactResult call() throws Exception {
-        return compact();
-    }
-
-    private CompactResult compact() throws Exception {
-        long startMillis = System.currentTimeMillis();
-
+    protected CompactResult doCompact(List<DataFileMeta> inputs) throws 
Exception {
         List<List<SortedRun>> candidate = new ArrayList<>();
-        List<DataFileMeta> before = new ArrayList<>();
-        List<DataFileMeta> after = new ArrayList<>();
+        List<DataFileMeta> compactBefore = new ArrayList<>();
+        List<DataFileMeta> compactAfter = new ArrayList<>();
 
         // Checking the order and compacting adjacent and contiguous files
         // Note: can't skip an intermediate file to compact, this will destroy 
the overall
@@ -99,40 +85,47 @@ public class CompactTask implements Callable<CompactResult> 
{
                         
candidate.add(singletonList(SortedRun.fromSingle(file)));
                     } else {
                         // Large file appear, rewrite previous and upgrade it
-                        rewrite(candidate, before, after);
-                        upgrade(file, before, after);
+                        rewrite(candidate, compactBefore, compactAfter);
+                        upgrade(file, compactBefore, compactAfter);
                     }
                 }
             }
         }
-        rewrite(candidate, before, after);
-
-        if (LOG.isDebugEnabled()) {
-            LOG.debug(
-                    "Done compacting {} files to {} files in {}ms. "
-                            + "Rewrite input size = {}, output size = {}, 
rewrite file num = {}, upgrade file num = {}",
-                    before.size(),
-                    after.size(),
-                    System.currentTimeMillis() - startMillis,
-                    rewriteInputSize,
-                    rewriteOutputSize,
-                    rewriteFilesNum,
-                    upgradeFilesNum);
-        }
+        rewrite(candidate, compactBefore, compactAfter);
+        return new CompactResult() {
+            @Override
+            public List<DataFileMeta> before() {
+                return compactBefore;
+            }
+
+            @Override
+            public List<DataFileMeta> after() {
+                return compactAfter;
+            }
+        };
+    }
 
-        return result(before, after);
+    @Override
+    protected String logMetric(
+            long startMillis, List<DataFileMeta> compactBefore, 
List<DataFileMeta> compactAfter) {
+        return String.format(
+                "%s, upgrade file num = %d",
+                super.logMetric(startMillis, compactBefore, compactAfter), 
upgradeFilesNum);
     }
 
-    private void upgrade(DataFileMeta file, List<DataFileMeta> before, 
List<DataFileMeta> after) {
+    private void upgrade(
+            DataFileMeta file, List<DataFileMeta> compactBefore, 
List<DataFileMeta> compactAfter) {
         if (file.level() != outputLevel) {
-            before.add(file);
-            after.add(file.upgrade(outputLevel));
+            compactBefore.add(file);
+            compactAfter.add(file.upgrade(outputLevel));
             upgradeFilesNum++;
         }
     }
 
     private void rewrite(
-            List<List<SortedRun>> candidate, List<DataFileMeta> before, 
List<DataFileMeta> after)
+            List<List<SortedRun>> candidate,
+            List<DataFileMeta> compactBefore,
+            List<DataFileMeta> compactAfter)
             throws Exception {
         if (candidate.isEmpty()) {
             return;
@@ -143,40 +136,15 @@ public class CompactTask implements 
Callable<CompactResult> {
                 return;
             } else if (section.size() == 1) {
                 for (DataFileMeta file : section.get(0).files()) {
-                    upgrade(file, before, after);
+                    upgrade(file, compactBefore, compactAfter);
                 }
                 candidate.clear();
                 return;
             }
         }
-        candidate.forEach(
-                runs ->
-                        runs.forEach(
-                                run -> {
-                                    before.addAll(run.files());
-                                    rewriteInputSize +=
-                                            run.files().stream()
-                                                    
.mapToLong(DataFileMeta::fileSize)
-                                                    .sum();
-                                    rewriteFilesNum += run.files().size();
-                                }));
+        candidate.forEach(runs -> runs.forEach(run -> 
compactBefore.addAll(run.files())));
         List<DataFileMeta> result = rewriter.rewrite(outputLevel, dropDelete, 
candidate);
-        after.addAll(result);
-        rewriteOutputSize += 
result.stream().mapToLong(DataFileMeta::fileSize).sum();
+        compactAfter.addAll(result);
         candidate.clear();
     }
-
-    private CompactResult result(List<DataFileMeta> before, List<DataFileMeta> 
after) {
-        return new CompactResult() {
-            @Override
-            public List<DataFileMeta> before() {
-                return before;
-            }
-
-            @Override
-            public List<DataFileMeta> after() {
-                return after;
-            }
-        };
-    }
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompaction.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompaction.java
index 44d51af7..f4217431 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompaction.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompaction.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.store.file.mergetree.compact;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.table.store.file.compact.CompactUnit;
 import org.apache.flink.table.store.file.mergetree.LevelSortedRun;
 import org.apache.flink.table.store.file.mergetree.SortedRun;
 
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java
index 6494c31a..7f7168eb 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java
@@ -166,7 +166,7 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
                 Snapshot snapshot = snapshotManager.snapshot(snapshotId);
                 manifests =
                         isIncremental
-                                ? 
manifestList.read(snapshot.deltaManifestList())
+                                ? readIncremental(snapshotId)
                                 : snapshot.readAllManifests(manifestList);
             }
         }
@@ -289,4 +289,14 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
     private List<ManifestEntry> readManifestFileMeta(ManifestFileMeta 
manifest) {
         return manifestFileFactory.create().read(manifest.fileName());
     }
+
+    private List<ManifestFileMeta> readIncremental(Long snapshotId) {
+        Snapshot snapshot = snapshotManager.snapshot(snapshotId);
+        if (snapshot.commitKind() == Snapshot.CommitKind.APPEND) {
+            return manifestList.read(snapshot.deltaManifestList());
+        }
+        throw new IllegalStateException(
+                String.format(
+                        "Incremental scan does not accept %s snapshot", 
snapshot.commitKind()));
+    }
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreWrite.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreWrite.java
index 7d8cc440..30456991 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreWrite.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreWrite.java
@@ -18,20 +18,26 @@
 
 package org.apache.flink.table.store.file.operation;
 
+import org.apache.flink.api.common.accumulators.LongCounter;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.compact.CompactResult;
+import org.apache.flink.table.store.file.data.AppendOnlyCompactManager;
 import org.apache.flink.table.store.file.data.AppendOnlyWriter;
 import org.apache.flink.table.store.file.data.DataFileMeta;
 import org.apache.flink.table.store.file.data.DataFilePathFactory;
-import org.apache.flink.table.store.file.mergetree.compact.CompactResult;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.table.store.file.utils.RecordReaderIterator;
 import org.apache.flink.table.store.file.utils.SnapshotManager;
 import org.apache.flink.table.store.file.writer.RecordWriter;
 import org.apache.flink.table.store.format.FileFormat;
+import org.apache.flink.table.store.table.source.Split;
 import org.apache.flink.table.types.logical.RowType;
 
 import javax.annotation.Nullable;
 
+import java.util.Collections;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
@@ -39,39 +45,51 @@ import java.util.concurrent.ExecutorService;
 /** {@link FileStoreWrite} for {@link 
org.apache.flink.table.store.file.AppendOnlyFileStore}. */
 public class AppendOnlyFileStoreWrite extends AbstractFileStoreWrite<RowData> {
 
+    private final AppendOnlyFileStoreRead read;
     private final long schemaId;
     private final RowType rowType;
     private final FileFormat fileFormat;
     private final FileStorePathFactory pathFactory;
     private final long targetFileSize;
+    private final int minFileNum;
+    private final int maxFileNum;
+    private final boolean commitForceCompact;
 
     public AppendOnlyFileStoreWrite(
+            AppendOnlyFileStoreRead read,
             long schemaId,
             RowType rowType,
             FileFormat fileFormat,
             FileStorePathFactory pathFactory,
             SnapshotManager snapshotManager,
             FileStoreScan scan,
-            long targetFileSize) {
+            long targetFileSize,
+            int minFileNum,
+            int maxFileNum,
+            boolean commitForceCompact) {
         super(snapshotManager, scan);
+        this.read = read;
         this.schemaId = schemaId;
         this.rowType = rowType;
         this.fileFormat = fileFormat;
         this.pathFactory = pathFactory;
         this.targetFileSize = targetFileSize;
+        this.maxFileNum = maxFileNum;
+        this.minFileNum = minFileNum;
+        this.commitForceCompact = commitForceCompact;
     }
 
     @Override
     public RecordWriter<RowData> createWriter(
             BinaryRowData partition, int bucket, ExecutorService 
compactExecutor) {
         return createWriter(
-                partition, bucket, 
getMaxSequenceNumber(scanExistingFileMetas(partition, bucket)));
+                partition, bucket, scanExistingFileMetas(partition, bucket), 
compactExecutor);
     }
 
     @Override
     public RecordWriter<RowData> createEmptyWriter(
             BinaryRowData partition, int bucket, ExecutorService 
compactExecutor) {
-        return createWriter(partition, bucket, -1L);
+        return createWriter(partition, bucket, Collections.emptyList(), 
compactExecutor);
     }
 
     @Override
@@ -82,9 +100,48 @@ public class AppendOnlyFileStoreWrite extends 
AbstractFileStoreWrite<RowData> {
     }
 
     private RecordWriter<RowData> createWriter(
-            BinaryRowData partition, int bucket, long maxSeqNum) {
+            BinaryRowData partition,
+            int bucket,
+            List<DataFileMeta> restoredFiles,
+            ExecutorService compactExecutor) {
+        // let writer and compact manager hold the same reference
+        // and make restore files mutable to update
+        LinkedList<DataFileMeta> restored = new LinkedList<>(restoredFiles);
         DataFilePathFactory factory = 
pathFactory.createDataFilePathFactory(partition, bucket);
         return new AppendOnlyWriter(
-                schemaId, fileFormat, targetFileSize, rowType, maxSeqNum, 
factory);
+                schemaId,
+                fileFormat,
+                targetFileSize,
+                rowType,
+                restored,
+                new AppendOnlyCompactManager(
+                        compactExecutor,
+                        restored,
+                        minFileNum,
+                        maxFileNum,
+                        targetFileSize,
+                        compactRewriter(partition, bucket)),
+                commitForceCompact,
+                factory);
+    }
+
+    private AppendOnlyCompactManager.CompactRewriter compactRewriter(
+            BinaryRowData partition, int bucket) {
+        return toCompact -> {
+            if (toCompact.isEmpty()) {
+                return Collections.emptyList();
+            }
+            AppendOnlyWriter.RowRollingWriter rewriter =
+                    AppendOnlyWriter.RowRollingWriter.createRollingRowWriter(
+                            schemaId,
+                            fileFormat,
+                            targetFileSize,
+                            rowType,
+                            pathFactory.createDataFilePathFactory(partition, 
bucket),
+                            new 
LongCounter(toCompact.get(0).minSequenceNumber()));
+            return rewriter.write(
+                    new RecordReaderIterator<>(
+                            read.createReader(new Split(partition, bucket, 
toCompact, false))));
+        };
     }
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWrite.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWrite.java
index 862e7596..95cf0991 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWrite.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/FileStoreWrite.java
@@ -19,8 +19,8 @@
 package org.apache.flink.table.store.file.operation;
 
 import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.compact.CompactResult;
 import org.apache.flink.table.store.file.data.DataFileMeta;
-import org.apache.flink.table.store.file.mergetree.compact.CompactResult;
 import org.apache.flink.table.store.file.writer.RecordWriter;
 
 import javax.annotation.Nullable;
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
index 09cd4604..574c7eb0 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
@@ -22,26 +22,26 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.file.KeyValue;
+import org.apache.flink.table.store.file.compact.CompactManager;
+import org.apache.flink.table.store.file.compact.CompactResult;
+import org.apache.flink.table.store.file.compact.CompactUnit;
 import org.apache.flink.table.store.file.data.DataFileMeta;
 import org.apache.flink.table.store.file.data.DataFileReader;
 import org.apache.flink.table.store.file.data.DataFileWriter;
 import org.apache.flink.table.store.file.mergetree.Levels;
 import org.apache.flink.table.store.file.mergetree.MergeTreeReader;
 import org.apache.flink.table.store.file.mergetree.MergeTreeWriter;
-import org.apache.flink.table.store.file.mergetree.compact.CompactManager;
-import org.apache.flink.table.store.file.mergetree.compact.CompactResult;
 import org.apache.flink.table.store.file.mergetree.compact.CompactRewriter;
 import org.apache.flink.table.store.file.mergetree.compact.CompactStrategy;
-import org.apache.flink.table.store.file.mergetree.compact.CompactTask;
-import org.apache.flink.table.store.file.mergetree.compact.CompactUnit;
 import org.apache.flink.table.store.file.mergetree.compact.MergeFunction;
+import 
org.apache.flink.table.store.file.mergetree.compact.MergeTreeCompactManager;
+import 
org.apache.flink.table.store.file.mergetree.compact.MergeTreeCompactTask;
 import org.apache.flink.table.store.file.mergetree.compact.UniversalCompaction;
 import org.apache.flink.table.store.file.schema.SchemaManager;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
 import org.apache.flink.table.store.file.utils.RecordReaderIterator;
 import org.apache.flink.table.store.file.utils.SnapshotManager;
 import org.apache.flink.table.store.file.writer.RecordWriter;
-import org.apache.flink.table.store.format.FileFormat;
 import org.apache.flink.table.types.logical.RowType;
 
 import javax.annotation.Nullable;
@@ -69,7 +69,6 @@ public class KeyValueFileStoreWrite extends 
AbstractFileStoreWrite<KeyValue> {
             RowType valueType,
             Supplier<Comparator<RowData>> keyComparatorSupplier,
             MergeFunction mergeFunction,
-            FileFormat fileFormat,
             FileStorePathFactory pathFactory,
             SnapshotManager snapshotManager,
             FileStoreScan scan,
@@ -77,13 +76,18 @@ public class KeyValueFileStoreWrite extends 
AbstractFileStoreWrite<KeyValue> {
         super(snapshotManager, scan);
         this.dataFileReaderFactory =
                 new DataFileReader.Factory(
-                        schemaManager, schemaId, keyType, valueType, 
fileFormat, pathFactory);
+                        schemaManager,
+                        schemaId,
+                        keyType,
+                        valueType,
+                        options.fileFormat(),
+                        pathFactory);
         this.dataFileWriterFactory =
                 new DataFileWriter.Factory(
                         schemaId,
                         keyType,
                         valueType,
-                        fileFormat,
+                        options.fileFormat(),
                         pathFactory,
                         options.targetFileSize());
         this.keyComparatorSupplier = keyComparatorSupplier;
@@ -115,7 +119,8 @@ public class KeyValueFileStoreWrite extends 
AbstractFileStoreWrite<KeyValue> {
         Levels levels = new Levels(keyComparator, compactFiles, 
options.numLevels());
         CompactUnit unit =
                 CompactUnit.fromLevelRuns(levels.numberOfLevels() - 1, 
levels.levelSortedRuns());
-        return new CompactTask(keyComparator, options.targetFileSize(), 
rewriter, unit, true);
+        return new MergeTreeCompactTask(
+                keyComparator, options.targetFileSize(), rewriter, unit, true);
     }
 
     private MergeTreeWriter createMergeTreeWriter(
@@ -125,6 +130,7 @@ public class KeyValueFileStoreWrite extends 
AbstractFileStoreWrite<KeyValue> {
             ExecutorService compactExecutor) {
         DataFileWriter dataFileWriter = 
dataFileWriterFactory.create(partition, bucket);
         Comparator<RowData> keyComparator = keyComparatorSupplier.get();
+        Levels levels = new Levels(keyComparator, restoreFiles, 
options.numLevels());
         return new MergeTreeWriter(
                 dataFileWriter.keyType(),
                 dataFileWriter.valueType(),
@@ -133,10 +139,11 @@ public class KeyValueFileStoreWrite extends 
AbstractFileStoreWrite<KeyValue> {
                         bucket,
                         new UniversalCompaction(
                                 options.maxSizeAmplificationPercent(),
-                                options.sizeRatio(),
+                                options.sortedRunSizeRatio(),
                                 options.numSortedRunCompactionTrigger()),
-                        compactExecutor),
-                new Levels(keyComparator, restoreFiles, options.numLevels()),
+                        compactExecutor,
+                        levels),
+                levels,
                 getMaxSequenceNumber(restoreFiles),
                 keyComparator,
                 mergeFunction.copy(),
@@ -150,11 +157,13 @@ public class KeyValueFileStoreWrite extends 
AbstractFileStoreWrite<KeyValue> {
             BinaryRowData partition,
             int bucket,
             CompactStrategy compactStrategy,
-            ExecutorService compactExecutor) {
+            ExecutorService compactExecutor,
+            Levels levels) {
         Comparator<RowData> keyComparator = keyComparatorSupplier.get();
         CompactRewriter rewriter = compactRewriter(partition, bucket, 
keyComparator);
-        return new CompactManager(
+        return new MergeTreeCompactManager(
                 compactExecutor,
+                levels,
                 compactStrategy,
                 keyComparator,
                 options.targetFileSize(),
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableCompact.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableCompact.java
index 68014345..21161345 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableCompact.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/TableCompact.java
@@ -19,9 +19,9 @@
 package org.apache.flink.table.store.table.sink;
 
 import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.compact.CompactResult;
 import org.apache.flink.table.store.file.data.DataFileMeta;
 import org.apache.flink.table.store.file.mergetree.Increment;
-import org.apache.flink.table.store.file.mergetree.compact.CompactResult;
 import org.apache.flink.table.store.file.operation.FileStoreScan;
 import org.apache.flink.table.store.file.operation.FileStoreWrite;
 import org.apache.flink.table.store.file.predicate.PredicateConverter;
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/AppendOnlyCompactManagerTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/AppendOnlyCompactManagerTest.java
new file mode 100644
index 00000000..9979f080
--- /dev/null
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/AppendOnlyCompactManagerTest.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.data;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+
+import static org.apache.flink.table.store.file.data.DataFileTestUtils.newFile;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link 
org.apache.flink.table.store.file.data.AppendOnlyCompactManager}. */
+public class AppendOnlyCompactManagerTest {
+
+    @Test
+    public void testPickEmptyAndNotRelease() {
+        // 1~50 is small enough, so hold it
+        List<DataFileMeta> toCompact = Collections.singletonList(newFile(1L, 
50L));
+        innerTest(toCompact, false, Collections.emptyList(), toCompact);
+    }
+
+    @Test
+    public void testPickEmptyAndRelease() {
+        // large file, release
+        innerTest(
+                Collections.singletonList(newFile(1L, 1024L)),
+                false,
+                Collections.emptyList(),
+                Collections.emptyList());
+
+        // small file at last, release previous
+        innerTest(
+                Arrays.asList(newFile(1L, 1024L), newFile(1025L, 2049L), 
newFile(2050L, 2100L)),
+                false,
+                Collections.emptyList(),
+                Collections.singletonList(newFile(2050L, 2100L)));
+        innerTest(
+                Arrays.asList(
+                        newFile(1L, 1024L),
+                        newFile(1025L, 2049L),
+                        newFile(2050L, 2100L),
+                        newFile(2100L, 2110L)),
+                false,
+                Collections.emptyList(),
+                Arrays.asList(newFile(2050L, 2100L), newFile(2100L, 2110L)));
+        innerTest(
+                Arrays.asList(newFile(1L, 1024L), newFile(1025L, 2049L), 
newFile(2050L, 2500L)),
+                false,
+                Collections.emptyList(),
+                Collections.singletonList(newFile(2050L, 2500L)));
+        innerTest(
+                Arrays.asList(
+                        newFile(1L, 1024L),
+                        newFile(1025L, 2049L),
+                        newFile(2050L, 2500L),
+                        newFile(2501L, 4096L),
+                        newFile(4097L, 6000L),
+                        newFile(6001L, 7000L),
+                        newFile(7001L, 7600L)),
+                false,
+                Collections.emptyList(),
+                Collections.singletonList(newFile(7001L, 7600L)));
+
+        // ignore single small file (in the middle)
+        innerTest(
+                Arrays.asList(
+                        newFile(1L, 1024L),
+                        newFile(1025L, 2049L),
+                        newFile(2050L, 2500L),
+                        newFile(2501L, 4096L)),
+                false,
+                Collections.emptyList(),
+                Collections.singletonList(newFile(2501L, 4096L)));
+
+        innerTest(
+                Arrays.asList(
+                        newFile(1L, 1024L),
+                        newFile(1025L, 2049L),
+                        newFile(2050L, 2500L),
+                        newFile(2501L, 4096L),
+                        newFile(4097L, 6000L)),
+                false,
+                Collections.emptyList(),
+                Collections.singletonList(newFile(4097L, 6000L)));
+
+        // wait for more file
+        innerTest(
+                Arrays.asList(newFile(1L, 500L), newFile(501L, 1000L)),
+                false,
+                Collections.emptyList(),
+                Arrays.asList(newFile(1L, 500L), newFile(501L, 1000L)));
+
+        innerTest(
+                Arrays.asList(newFile(1L, 500L), newFile(501L, 1000L), 
newFile(1001L, 2026L)),
+                false,
+                Collections.emptyList(),
+                Arrays.asList(newFile(501L, 1000L), newFile(1001L, 2026L)));
+
+        innerTest(
+                Arrays.asList(newFile(1L, 2000L), newFile(2001L, 2005L), 
newFile(2006L, 2010L)),
+                false,
+                Collections.emptyList(),
+                Arrays.asList(newFile(2001L, 2005L), newFile(2006L, 2010L)));
+    }
+
+    @Test
+    public void testPick() {
+        // fileNum is 13 (which > 12) and totalFileSize is 130 (which < 1024)
+        List<DataFileMeta> toCompact1 =
+                Arrays.asList(
+                        // 1~10, 11~20, ..., 111~120
+                        newFile(1L, 10L),
+                        newFile(11L, 20L),
+                        newFile(21L, 30L),
+                        newFile(31L, 40L),
+                        newFile(41L, 50L),
+                        newFile(51L, 60L),
+                        newFile(61L, 70L),
+                        newFile(71L, 80L),
+                        newFile(81L, 90L),
+                        newFile(91L, 100L),
+                        newFile(101L, 110L),
+                        newFile(111L, 120L),
+                        newFile(121L, 130L));
+        innerTest(
+                toCompact1,
+                true,
+                toCompact1.subList(0, toCompact1.size() - 1),
+                Collections.singletonList(newFile(121L, 130L)));
+
+        // fileNum is 4 (which > 3) and totalFileSize is 1026 (which > 1024)
+        List<DataFileMeta> toCompact2 =
+                Arrays.asList(
+                        // 1~2, 3~500, 501~1000, 1001~1025
+                        newFile(1L, 2L),
+                        newFile(3L, 500L),
+                        newFile(501L, 1000L),
+                        newFile(1001L, 1025L),
+                        newFile(1026L, 1050L));
+        innerTest(
+                toCompact2,
+                true,
+                toCompact2.subList(0, 4),
+                Collections.singletonList(newFile(1026L, 1050L)));
+
+        // fileNum is 13 (which > 12) and totalFileSize is 130 (which < 1024)
+        List<DataFileMeta> toCompact3 =
+                Arrays.asList(
+                        newFile(1L, 1022L),
+                        newFile(1023L, 1024L),
+                        newFile(1025L, 2050L),
+                        // 2051~2510, ..., 2611~2620
+                        newFile(2051L, 2510L),
+                        newFile(2511L, 2520L),
+                        newFile(2521L, 2530L),
+                        newFile(2531L, 2540L),
+                        newFile(2541L, 2550L),
+                        newFile(2551L, 2560L),
+                        newFile(2561L, 2570L),
+                        newFile(2571L, 2580L),
+                        newFile(2581L, 2590L),
+                        newFile(2591L, 2600L),
+                        newFile(2601L, 2610L),
+                        newFile(2611L, 2620L),
+                        newFile(2621L, 2630L));
+        innerTest(
+                toCompact3,
+                true,
+                toCompact3.subList(3, toCompact3.size() - 1),
+                Collections.singletonList(newFile(2621L, 2630L)));
+    }
+
+    private void innerTest(
+            List<DataFileMeta> toCompactBeforePick,
+            boolean expectedPresent,
+            List<DataFileMeta> expectedCompactBefore,
+            List<DataFileMeta> toCompactAfterPick) {
+        int minFileNum = 4;
+        int maxFileNum = 12;
+        long targetFileSize = 1024;
+        AppendOnlyCompactManager manager =
+                new AppendOnlyCompactManager(
+                        null,
+                        new LinkedList<>(toCompactBeforePick),
+                        minFileNum,
+                        maxFileNum,
+                        targetFileSize,
+                        null);
+        Optional<List<DataFileMeta>> actual = manager.pickCompactBefore();
+        assertThat(actual.isPresent()).isEqualTo(expectedPresent);
+        if (expectedPresent) {
+            
assertThat(actual.get()).containsExactlyElementsOf(expectedCompactBefore);
+        }
+        
assertThat(manager.getToCompact()).containsExactlyElementsOf(toCompactAfterPick);
+    }
+}
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/AppendOnlyWriterTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/AppendOnlyWriterTest.java
index 3fdeb330..a60380c0 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/AppendOnlyWriterTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/AppendOnlyWriterTest.java
@@ -35,13 +35,18 @@ import org.apache.flink.table.types.logical.IntType;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
 
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.LinkedList;
 import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.Executors;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -61,6 +66,9 @@ public class AppendOnlyWriterTest {
 
     private static final String AVRO = "avro";
     private static final String PART = "2022-05-01";
+    private static final long SCHEMA_ID = 0L;
+    private static final int MIN_FILE_NUM = 3;
+    private static final int MAX_FILE_NUM = 4;
 
     @BeforeEach
     public void before() {
@@ -69,7 +77,7 @@ public class AppendOnlyWriterTest {
 
     @Test
     public void testEmptyCommits() throws Exception {
-        RecordWriter<RowData> writer = createWriter(1024 * 1024L, SCHEMA, 0);
+        RecordWriter<RowData> writer = createEmptyWriter(1024 * 1024L);
 
         for (int i = 0; i < 3; i++) {
             writer.sync();
@@ -83,7 +91,7 @@ public class AppendOnlyWriterTest {
 
     @Test
     public void testSingleWrite() throws Exception {
-        RecordWriter<RowData> writer = createWriter(1024 * 1024L, SCHEMA, 0);
+        RecordWriter<RowData> writer = createEmptyWriter(1024 * 1024L);
         writer.write(row(1, "AAA", PART));
 
         List<DataFileMeta> result = writer.close();
@@ -106,14 +114,14 @@ public class AppendOnlyWriterTest {
                 };
         
assertThat(meta.valueStats()).isEqualTo(STATS_SERIALIZER.toBinary(expected));
 
-        assertThat(meta.minSequenceNumber()).isEqualTo(1);
-        assertThat(meta.maxSequenceNumber()).isEqualTo(1);
+        assertThat(meta.minSequenceNumber()).isEqualTo(0);
+        assertThat(meta.maxSequenceNumber()).isEqualTo(0);
         assertThat(meta.level()).isEqualTo(DataFileMeta.DUMMY_LEVEL);
     }
 
     @Test
     public void testMultipleCommits() throws Exception {
-        RecordWriter<RowData> writer = createWriter(1024 * 1024L, SCHEMA, 0);
+        RecordWriter<RowData> writer = createWriter(1024 * 1024L, true, 
Collections.emptyList());
 
         // Commit 5 continues txn.
         for (int txn = 0; txn < 5; txn += 1) {
@@ -127,8 +135,25 @@ public class AppendOnlyWriterTest {
 
             writer.sync();
             Increment inc = writer.prepareCommit();
-            assertThat(inc.compactBefore()).isEqualTo(Collections.emptyList());
-            assertThat(inc.compactAfter()).isEqualTo(Collections.emptyList());
+            if (txn > 0 && txn % 3 == 0) {
+                assertThat(inc.compactBefore()).hasSize(4);
+                assertThat(inc.compactAfter()).hasSize(1);
+                DataFileMeta compactAfter = inc.compactAfter().get(0);
+                assertThat(compactAfter.fileName()).startsWith("compact-");
+                assertThat(compactAfter.fileSize())
+                        .isEqualTo(
+                                inc.compactBefore().stream()
+                                        .mapToLong(DataFileMeta::fileSize)
+                                        .sum());
+                assertThat(compactAfter.rowCount())
+                        .isEqualTo(
+                                inc.compactBefore().stream()
+                                        .mapToLong(DataFileMeta::rowCount)
+                                        .sum());
+            } else {
+                
assertThat(inc.compactBefore()).isEqualTo(Collections.emptyList());
+                
assertThat(inc.compactAfter()).isEqualTo(Collections.emptyList());
+            }
 
             assertThat(inc.newFiles().size()).isEqualTo(1);
             DataFileMeta meta = inc.newFiles().get(0);
@@ -149,8 +174,8 @@ public class AppendOnlyWriterTest {
                     };
             
assertThat(meta.valueStats()).isEqualTo(STATS_SERIALIZER.toBinary(expected));
 
-            assertThat(meta.minSequenceNumber()).isEqualTo(start + 1);
-            assertThat(meta.maxSequenceNumber()).isEqualTo(end);
+            assertThat(meta.minSequenceNumber()).isEqualTo(start);
+            assertThat(meta.maxSequenceNumber()).isEqualTo(end - 1);
             assertThat(meta.level()).isEqualTo(DataFileMeta.DUMMY_LEVEL);
         }
     }
@@ -159,21 +184,21 @@ public class AppendOnlyWriterTest {
     public void testRollingWrite() throws Exception {
         // Set a very small target file size, so that we will roll over to a 
new file even if
         // writing one record.
-        RecordWriter<RowData> writer = createWriter(10L, SCHEMA, 0);
+        AppendOnlyWriter writer = createEmptyWriter(10L);
 
         for (int i = 0; i < 10; i++) {
             writer.write(row(i, String.format("%03d", i), PART));
         }
 
         writer.sync();
-        Increment inc = writer.prepareCommit();
-        assertThat(inc.compactBefore()).isEqualTo(Collections.emptyList());
-        assertThat(inc.compactAfter()).isEqualTo(Collections.emptyList());
+        Increment firstInc = writer.prepareCommit();
+        
assertThat(firstInc.compactBefore()).isEqualTo(Collections.emptyList());
+        assertThat(firstInc.compactAfter()).isEqualTo(Collections.emptyList());
 
-        assertThat(inc.newFiles().size()).isEqualTo(10);
+        assertThat(firstInc.newFiles().size()).isEqualTo(10);
 
         int id = 0;
-        for (DataFileMeta meta : inc.newFiles()) {
+        for (DataFileMeta meta : firstInc.newFiles()) {
             Path path = pathFactory.toPath(meta.fileName());
             assertThat(path.getFileSystem().exists(path)).isTrue();
 
@@ -190,12 +215,48 @@ public class AppendOnlyWriterTest {
                     };
             
assertThat(meta.valueStats()).isEqualTo(STATS_SERIALIZER.toBinary(expected));
 
-            assertThat(meta.minSequenceNumber()).isEqualTo(id + 1);
-            assertThat(meta.maxSequenceNumber()).isEqualTo(id + 1);
+            assertThat(meta.minSequenceNumber()).isEqualTo(id);
+            assertThat(meta.maxSequenceNumber()).isEqualTo(id);
             assertThat(meta.level()).isEqualTo(DataFileMeta.DUMMY_LEVEL);
 
             id += 1;
         }
+
+        // increase target file size to test compaction
+        long targetFileSize = 1024 * 1024L;
+        writer = createWriter(targetFileSize, true, firstInc.newFiles());
+        
assertThat(writer.getToCompact()).containsExactlyElementsOf(firstInc.newFiles());
+        writer.write(row(id, String.format("%03d", id), PART));
+        writer.sync();
+        Increment secInc = writer.prepareCommit();
+
+        // check compact before and after
+        List<DataFileMeta> compactBefore = secInc.compactBefore();
+        List<DataFileMeta> compactAfter = secInc.compactAfter();
+        assertThat(compactBefore)
+                
.containsExactlyInAnyOrderElementsOf(firstInc.newFiles().subList(0, 4));
+        assertThat(compactAfter).hasSize(1);
+        
assertThat(compactBefore.stream().mapToLong(DataFileMeta::fileSize).sum())
+                
.isEqualTo(compactAfter.stream().mapToLong(DataFileMeta::fileSize).sum());
+        
assertThat(compactBefore.stream().mapToLong(DataFileMeta::rowCount).sum())
+                
.isEqualTo(compactAfter.stream().mapToLong(DataFileMeta::rowCount).sum());
+        // check seq number
+        assertThat(compactBefore.get(0).minSequenceNumber())
+                .isEqualTo(compactAfter.get(0).minSequenceNumber());
+        assertThat(compactBefore.get(compactBefore.size() - 
1).maxSequenceNumber())
+                .isEqualTo(compactAfter.get(compactAfter.size() - 
1).maxSequenceNumber());
+        assertThat(secInc.newFiles()).hasSize(1);
+
+        /* check toCompact[round + 1] is composed of
+         * <1> the compactAfter[round] (due to small size)
+         * <2> the rest of toCompact[round]
+         * <3> the newFiles[round]
+         * with strict order
+         */
+        List<DataFileMeta> toCompact = new ArrayList<>(compactAfter);
+        toCompact.addAll(firstInc.newFiles().subList(4, 
firstInc.newFiles().size()));
+        toCompact.addAll(secInc.newFiles());
+        assertThat(writer.getToCompact()).containsExactlyElementsOf(toCompact);
     }
 
     private FieldStats initStats(Integer min, Integer max, long nullCount) {
@@ -214,15 +275,68 @@ public class AppendOnlyWriterTest {
         return new DataFilePathFactory(
                 new Path(tempDir.toString()),
                 "dt=" + PART,
-                1,
+                0,
                 CoreOptions.FILE_FORMAT.defaultValue());
     }
 
-    private RecordWriter<RowData> createWriter(
-            long targetFileSize, RowType writeSchema, long maxSeqNum) {
-        FileFormat fileFormat = FileFormat.fromIdentifier(AVRO, new 
Configuration());
+    private AppendOnlyWriter createEmptyWriter(long targetFileSize) {
+        return createWriter(targetFileSize, false, Collections.emptyList());
+    }
 
+    private AppendOnlyWriter createWriter(
+            long targetFileSize, boolean forceCompact, List<DataFileMeta> 
scannedFiles) {
+        FileFormat fileFormat = FileFormat.fromIdentifier(AVRO, new 
Configuration());
+        LinkedList<DataFileMeta> toCompact = new LinkedList<>(scannedFiles);
         return new AppendOnlyWriter(
-                0, fileFormat, targetFileSize, writeSchema, maxSeqNum, 
pathFactory);
+                SCHEMA_ID,
+                fileFormat,
+                targetFileSize,
+                AppendOnlyWriterTest.SCHEMA,
+                toCompact,
+                new AppendOnlyCompactManager(
+                        Executors.newSingleThreadScheduledExecutor(
+                                new 
ExecutorThreadFactory("compaction-thread")),
+                        toCompact,
+                        MIN_FILE_NUM,
+                        MAX_FILE_NUM,
+                        targetFileSize,
+                        compactBefore ->
+                                compactBefore.isEmpty()
+                                        ? Collections.emptyList()
+                                        : Collections.singletonList(
+                                                
generateCompactAfter(compactBefore))),
+                forceCompact,
+                pathFactory);
+    }
+
+    private DataFileMeta generateCompactAfter(List<DataFileMeta> toCompact) {
+        int size = toCompact.size();
+        long minSeq = toCompact.get(0).minSequenceNumber();
+        long maxSeq = toCompact.get(size - 1).maxSequenceNumber();
+        String fileName = "compact-" + UUID.randomUUID();
+        return DataFileMeta.forAppend(
+                fileName,
+                toCompact.stream().mapToLong(DataFileMeta::fileSize).sum(),
+                toCompact.stream().mapToLong(DataFileMeta::rowCount).sum(),
+                STATS_SERIALIZER.toBinary(
+                        new FieldStats[] {
+                            initStats(
+                                    
toCompact.get(0).valueStats().min().getInt(0),
+                                    toCompact.get(size - 
1).valueStats().max().getInt(0),
+                                    0),
+                            initStats(
+                                    
toCompact.get(0).valueStats().min().getString(1).toString(),
+                                    toCompact
+                                            .get(size - 1)
+                                            .valueStats()
+                                            .max()
+                                            .getString(1)
+                                            .toString(),
+                                    0),
+                            initStats(PART, PART, 0)
+                        }),
+                minSeq,
+                maxSeq,
+                toCompact.get(0).schemaId());
     }
 }
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/DataFileTestUtils.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/DataFileTestUtils.java
index d515a891..b977c6a8 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/DataFileTestUtils.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/data/DataFileTestUtils.java
@@ -32,6 +32,21 @@ public class DataFileTestUtils {
         return newFile("", level, minKey, maxKey, maxSequence);
     }
 
+    public static DataFileMeta newFile(long minSeq, long maxSeq) {
+        return new DataFileMeta(
+                "",
+                maxSeq - minSeq + 1,
+                maxSeq - minSeq + 1,
+                DataFileMeta.EMPTY_MIN_KEY,
+                DataFileMeta.EMPTY_MAX_KEY,
+                DataFileMeta.EMPTY_KEY_STATS,
+                null,
+                minSeq,
+                maxSeq,
+                0L,
+                DataFileMeta.DUMMY_LEVEL);
+    }
+
     public static DataFileMeta newFile(
             String name, int level, int minKey, int maxKey, long maxSequence) {
         return new DataFileMeta(
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FileFormatSuffixTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FileFormatSuffixTest.java
index 2288c931..4be7c693 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FileFormatSuffixTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/format/FileFormatSuffixTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.store.file.data.AppendOnlyCompactManager;
 import org.apache.flink.table.store.file.data.AppendOnlyWriter;
 import org.apache.flink.table.store.file.data.DataFileMeta;
 import org.apache.flink.table.store.file.data.DataFilePathFactory;
@@ -37,6 +38,7 @@ import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
+import java.util.LinkedList;
 import java.util.List;
 
 /** test file format suffix. */
@@ -57,8 +59,17 @@ public class FileFormatSuffixTest extends DataFileTest {
         DataFilePathFactory dataFilePathFactory =
                 new DataFilePathFactory(new Path(tempDir.toString()), "dt=1", 
1, format);
         FileFormat fileFormat = FileFormat.fromIdentifier(format, new 
Configuration());
+        LinkedList<DataFileMeta> toCompact = new LinkedList<>();
         AppendOnlyWriter appendOnlyWriter =
-                new AppendOnlyWriter(0, fileFormat, 10, SCHEMA, 10, 
dataFilePathFactory);
+                new AppendOnlyWriter(
+                        0,
+                        fileFormat,
+                        10,
+                        SCHEMA,
+                        toCompact,
+                        new AppendOnlyCompactManager(null, toCompact, 4, 10, 
10, null), // not used
+                        false,
+                        dataFilePathFactory);
         appendOnlyWriter.write(
                 GenericRowData.of(1, StringData.fromString("aaa"), 
StringData.fromString("1")));
         List<DataFileMeta> result = appendOnlyWriter.close();
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializerTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializerTest.java
index df629d52..9867811d 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializerTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/manifest/ManifestCommittableSerializerTest.java
@@ -29,7 +29,7 @@ import java.util.Arrays;
 import java.util.Random;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import static 
org.apache.flink.table.store.file.mergetree.compact.CompactManagerTest.row;
+import static 
org.apache.flink.table.store.file.mergetree.compact.MergeTreeCompactManagerTest.row;
 import static 
org.apache.flink.table.store.file.stats.StatsTestUtils.newTableStats;
 import static org.assertj.core.api.Assertions.assertThat;
 
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/LevelsTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/LevelsTest.java
index 674bb2ec..aed28096 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/LevelsTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/LevelsTest.java
@@ -27,7 +27,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
 
-import static 
org.apache.flink.table.store.file.mergetree.compact.CompactManagerTest.row;
+import static 
org.apache.flink.table.store.file.mergetree.compact.MergeTreeCompactManagerTest.row;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test for {@link Levels}. */
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
index eecb0f30..699eeb54 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/MergeTreeTest.java
@@ -32,11 +32,11 @@ import 
org.apache.flink.table.store.file.data.DataFileReader;
 import org.apache.flink.table.store.file.data.DataFileWriter;
 import org.apache.flink.table.store.file.format.FlushingFileFormat;
 import org.apache.flink.table.store.file.memory.HeapMemorySegmentPool;
-import org.apache.flink.table.store.file.mergetree.compact.CompactManager;
 import org.apache.flink.table.store.file.mergetree.compact.CompactRewriter;
 import org.apache.flink.table.store.file.mergetree.compact.CompactStrategy;
 import 
org.apache.flink.table.store.file.mergetree.compact.DeduplicateMergeFunction;
 import org.apache.flink.table.store.file.mergetree.compact.IntervalPartition;
+import 
org.apache.flink.table.store.file.mergetree.compact.MergeTreeCompactManager;
 import org.apache.flink.table.store.file.mergetree.compact.UniversalCompaction;
 import org.apache.flink.table.store.file.schema.SchemaManager;
 import org.apache.flink.table.store.file.utils.FileStorePathFactory;
@@ -270,7 +270,7 @@ public class MergeTreeTest {
                 new MergeTreeWriter(
                         dataFileWriter.keyType(),
                         dataFileWriter.valueType(),
-                        createCompactManager(dataFileWriter, service),
+                        createCompactManager(dataFileWriter, service, files),
                         new Levels(comparator, files, options.numLevels()),
                         maxSequenceNumber,
                         comparator,
@@ -284,17 +284,19 @@ public class MergeTreeTest {
         return writer;
     }
 
-    private CompactManager createCompactManager(
-            DataFileWriter dataFileWriter, ExecutorService compactExecutor) {
-        CompactStrategy compactStrategy =
+    private MergeTreeCompactManager createCompactManager(
+            DataFileWriter dataFileWriter,
+            ExecutorService compactExecutor,
+            List<DataFileMeta> files) {
+        CompactStrategy strategy =
                 new UniversalCompaction(
                         options.maxSizeAmplificationPercent(),
-                        options.sizeRatio(),
+                        options.sortedRunSizeRatio(),
                         options.numSortedRunCompactionTrigger());
         CompactRewriter rewriter =
                 (outputLevel, dropDelete, sections) ->
                         dataFileWriter.write(
-                                new RecordReaderIterator<KeyValue>(
+                                new RecordReaderIterator<>(
                                         new MergeTreeReader(
                                                 sections,
                                                 dropDelete,
@@ -302,8 +304,13 @@ public class MergeTreeTest {
                                                 comparator,
                                                 new 
DeduplicateMergeFunction())),
                                 outputLevel);
-        return new CompactManager(
-                compactExecutor, compactStrategy, comparator, 
options.targetFileSize(), rewriter);
+        return new MergeTreeCompactManager(
+                compactExecutor,
+                new Levels(comparator, files, options.numLevels()),
+                strategy,
+                comparator,
+                options.targetFileSize(),
+                rewriter);
     }
 
     private void mergeCompacted(
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/CompactManagerTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/MergeTreeCompactManagerTest.java
similarity index 96%
rename from 
flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/CompactManagerTest.java
rename to 
flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/MergeTreeCompactManagerTest.java
index de636f63..64c46c81 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/CompactManagerTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/MergeTreeCompactManagerTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.store.file.mergetree.compact;
 
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.binary.BinaryRowData;
+import org.apache.flink.table.store.file.compact.CompactUnit;
 import org.apache.flink.table.store.file.data.DataFileMeta;
 import org.apache.flink.table.store.file.data.DataFileTestUtils;
 import org.apache.flink.table.store.file.mergetree.Levels;
@@ -44,8 +45,8 @@ import java.util.stream.Collectors;
 import static org.apache.flink.table.store.file.data.DataFileTestUtils.newFile;
 import static org.assertj.core.api.Assertions.assertThat;
 
-/** Test for {@link CompactManager}. */
-public class CompactManagerTest {
+/** Test for {@link MergeTreeCompactManager}. */
+public class MergeTreeCompactManagerTest {
 
     private final Comparator<RowData> comparator = Comparator.comparingInt(o 
-> o.getInt(0));
 
@@ -194,11 +195,11 @@ public class CompactManagerTest {
             files.add(minMax.toFile(i));
         }
         Levels levels = new Levels(comparator, files, 3);
-        CompactManager manager =
-                new CompactManager(
-                        service, strategy, comparator, 2, 
testRewriter(expectedDropDelete));
-        manager.submitCompaction(levels);
-        manager.finishCompaction(levels, true);
+        MergeTreeCompactManager manager =
+                new MergeTreeCompactManager(
+                        service, levels, strategy, comparator, 2, 
testRewriter(expectedDropDelete));
+        manager.submitCompaction();
+        manager.finishCompaction(true);
         List<LevelMinMax> outputs =
                 
levels.allFiles().stream().map(LevelMinMax::new).collect(Collectors.toList());
         assertThat(outputs).isEqualTo(expected);
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompactionTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompactionTest.java
index 7dcb6f61..a42ede9c 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompactionTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompactionTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.store.file.mergetree.compact;
 
+import org.apache.flink.table.store.file.compact.CompactUnit;
 import org.apache.flink.table.store.file.data.DataFileMeta;
 import org.apache.flink.table.store.file.mergetree.LevelSortedRun;
 import org.apache.flink.table.store.file.mergetree.SortedRun;
diff --git 
a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogTestUtils.java
 
b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogTestUtils.java
index 8c82a433..70b45314 100644
--- 
a/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogTestUtils.java
+++ 
b/flink-table-store-kafka/src/test/java/org/apache/flink/table/store/kafka/KafkaLogTestUtils.java
@@ -58,7 +58,7 @@ import java.util.stream.IntStream;
 import static org.apache.flink.table.data.binary.BinaryRowDataUtil.EMPTY_ROW;
 import static org.apache.flink.table.store.CoreOptions.LOG_CHANGELOG_MODE;
 import static org.apache.flink.table.store.CoreOptions.LOG_CONSISTENCY;
-import static 
org.apache.flink.table.store.file.mergetree.compact.CompactManagerTest.row;
+import static 
org.apache.flink.table.store.file.mergetree.compact.MergeTreeCompactManagerTest.row;
 import static 
org.apache.flink.table.store.kafka.KafkaLogOptions.BOOTSTRAP_SERVERS;
 import static org.apache.flink.table.store.kafka.KafkaLogOptions.TOPIC;
 

Reply via email to