This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 241ac76169 [flink] Copy bytes with multiple threads when preforming 
precommit compact for changelogs (#4907)
241ac76169 is described below

commit 241ac76169060fc80fa959c52ed6bd7f750d7fff
Author: tsreaper <[email protected]>
AuthorDate: Sun Apr 6 19:27:08 2025 +0800

    [flink] Copy bytes with multiple threads when preforming precommit compact 
for changelogs (#4907)
---
 .../generated/flink_connector_configuration.html   |   8 +-
 .../org/apache/paimon/utils/ThreadPoolUtils.java   |  29 +-
 .../apache/paimon/flink/FlinkConnectorOptions.java |  19 +-
 .../ChangelogCompactCoordinateOperator.java        | 101 +++++-
 .../compact/changelog/ChangelogCompactTask.java    | 204 +++++++----
 .../changelog/ChangelogCompactWorkerOperator.java  |  29 +-
 .../org/apache/paimon/flink/sink/FlinkSink.java    |   2 +-
 .../flink/PrimaryKeyFileStoreTableITCase.java      |   6 +-
 .../ChangelogCompactCoordinateOperatorTest.java    | 371 +++++++++++++++++++++
 .../ChangelogCompactTaskSerializerTest.java        |  13 +-
 10 files changed, 684 insertions(+), 98 deletions(-)

diff --git 
a/docs/layouts/shortcodes/generated/flink_connector_configuration.html 
b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
index feb8715701..f348c35e8e 100644
--- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
@@ -26,6 +26,12 @@ under the License.
         </tr>
     </thead>
     <tbody>
+        <tr>
+            <td><h5>changelog.precommit-compact.thread-num</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>Integer</td>
+            <td>Maximum number of threads to copy bytes from small changelog 
files. By default is the number of processors available to the Java virtual 
machine.</td>
+        </tr>
         <tr>
             <td><h5>end-input.watermark</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
@@ -108,7 +114,7 @@ under the License.
             <td><h5>precommit-compact</h5></td>
             <td style="word-wrap: break-word;">false</td>
             <td>Boolean</td>
-            <td>If true, it will add a compact coordinator and worker operator 
after the writer operator,in order to compact several changelog files (for 
primary key tables) or newly created data files (for unaware bucket tables) 
from the same partition into large ones, which can decrease the number of small 
files. </td>
+            <td>If true, it will add a compact coordinator and worker operator 
after the writer operator,in order to compact several changelog files (for 
primary key tables) or newly created data files (for unaware bucket tables) 
from the same partition into large ones, which can decrease the number of small 
files.</td>
         </tr>
         <tr>
             <td><h5>scan.bounded</h5></td>
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/utils/ThreadPoolUtils.java 
b/paimon-common/src/main/java/org/apache/paimon/utils/ThreadPoolUtils.java
index e4b3da8ca8..a4790583c5 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/ThreadPoolUtils.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/ThreadPoolUtils.java
@@ -138,17 +138,7 @@ public class ThreadPoolUtils {
 
     public static <U> void randomlyOnlyExecute(
             ExecutorService executor, Consumer<U> processor, Collection<U> 
input) {
-        List<Future<?>> futures = new ArrayList<>(input.size());
-        ClassLoader cl = Thread.currentThread().getContextClassLoader();
-        for (U u : input) {
-            futures.add(
-                    executor.submit(
-                            () -> {
-                                
Thread.currentThread().setContextClassLoader(cl);
-                                processor.accept(u);
-                            }));
-        }
-        awaitAllFutures(futures);
+        awaitAllFutures(submitAllTasks(executor, processor, input));
     }
 
     public static <U, T> Iterator<T> randomlyExecuteSequentialReturn(
@@ -189,7 +179,22 @@ public class ThreadPoolUtils {
                 });
     }
 
-    private static void awaitAllFutures(List<Future<?>> futures) {
+    public static <U> List<Future<?>> submitAllTasks(
+            ExecutorService executor, Consumer<U> processor, Collection<U> 
input) {
+        List<Future<?>> futures = new ArrayList<>(input.size());
+        ClassLoader cl = Thread.currentThread().getContextClassLoader();
+        for (U u : input) {
+            futures.add(
+                    executor.submit(
+                            () -> {
+                                
Thread.currentThread().setContextClassLoader(cl);
+                                processor.accept(u);
+                            }));
+        }
+        return futures;
+    }
+
+    public static void awaitAllFutures(List<Future<?>> futures) {
         for (Future<?> future : futures) {
             try {
                 future.get();
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
index adf7c9624c..a9e7f0f7d1 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
@@ -430,7 +430,24 @@ public class FlinkConnectorOptions {
                                     + "in order to compact several changelog 
files (for primary key tables) "
                                     + "or newly created data files (for 
unaware bucket tables) "
                                     + "from the same partition into large 
ones, "
-                                    + "which can decrease the number of small 
files. ");
+                                    + "which can decrease the number of small 
files.");
+
+    public static final ConfigOption<Integer> 
CHANGELOG_PRECOMMIT_COMPACT_THREAD_NUM =
+            key("changelog.precommit-compact.thread-num")
+                    .intType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Maximum number of threads to copy bytes from 
small changelog files. "
+                                    + "By default is the number of processors 
available to the Java virtual machine.");
+
+    @ExcludeFromDocumentation("Most users won't need to adjust this config")
+    public static final ConfigOption<MemorySize> 
CHANGELOG_PRECOMMIT_COMPACT_BUFFER_SIZE =
+            key("changelog.precommit-compact.buffer-size")
+                    .memoryType()
+                    .defaultValue(MemorySize.ofMebiBytes(128))
+                    .withDescription(
+                            "The buffer size for copying bytes from small 
changelog files. "
+                                    + "The default value is 128 MB.");
 
     public static final ConfigOption<String> SOURCE_OPERATOR_UID_SUFFIX =
             key("source.operator-uid.suffix")
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactCoordinateOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactCoordinateOperator.java
index 9f6bd4431d..34553ebe5f 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactCoordinateOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactCoordinateOperator.java
@@ -18,12 +18,13 @@
 
 package org.apache.paimon.flink.compact.changelog;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.flink.FlinkConnectorOptions;
 import org.apache.paimon.flink.sink.Committable;
 import org.apache.paimon.io.CompactIncrement;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.DataIncrement;
-import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.sink.CommitMessageImpl;
 
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
@@ -35,6 +36,7 @@ import org.apache.flink.types.Either;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -49,13 +51,14 @@ public class ChangelogCompactCoordinateOperator
         extends AbstractStreamOperator<Either<Committable, 
ChangelogCompactTask>>
         implements OneInputStreamOperator<Committable, Either<Committable, 
ChangelogCompactTask>>,
                 BoundedOneInput {
-    private final FileStoreTable table;
+
+    private final CoreOptions options;
 
     private transient long checkpointId;
     private transient Map<BinaryRow, PartitionChangelog> partitionChangelogs;
 
-    public ChangelogCompactCoordinateOperator(FileStoreTable table) {
-        this.table = table;
+    public ChangelogCompactCoordinateOperator(CoreOptions options) {
+        this.options = options;
     }
 
     @Override
@@ -63,7 +66,7 @@ public class ChangelogCompactCoordinateOperator
         super.open();
 
         checkpointId = Long.MIN_VALUE;
-        partitionChangelogs = new HashMap<>();
+        partitionChangelogs = new LinkedHashMap<>();
     }
 
     public void processElement(StreamRecord<Committable> record) {
@@ -81,10 +84,26 @@ public class ChangelogCompactCoordinateOperator
             return;
         }
 
+        // Changelog files are not stored in an LSM tree,
+        // so we can regard them as files without primary keys.
+        long targetFileSize = options.targetFileSize(false);
+        long compactionFileSize =
+                Math.min(
+                        options.compactionFileSize(false),
+                        options.toConfiguration()
+                                
.get(FlinkConnectorOptions.CHANGELOG_PRECOMMIT_COMPACT_BUFFER_SIZE)
+                                .getBytes());
+
         BinaryRow partition = message.partition();
         Integer bucket = message.bucket();
-        long targetFileSize = table.coreOptions().targetFileSize(false);
+        List<DataFileMeta> skippedNewChangelogs = new ArrayList<>();
+        List<DataFileMeta> skippedCompactChangelogs = new ArrayList<>();
+
         for (DataFileMeta meta : message.newFilesIncrement().changelogFiles()) 
{
+            if (meta.fileSize() >= compactionFileSize) {
+                skippedNewChangelogs.add(meta);
+                continue;
+            }
             partitionChangelogs
                     .computeIfAbsent(partition, k -> new PartitionChangelog())
                     .addNewChangelogFile(bucket, meta);
@@ -94,6 +113,10 @@ public class ChangelogCompactCoordinateOperator
             }
         }
         for (DataFileMeta meta : message.compactIncrement().changelogFiles()) {
+            if (meta.fileSize() >= compactionFileSize) {
+                skippedCompactChangelogs.add(meta);
+                continue;
+            }
             partitionChangelogs
                     .computeIfAbsent(partition, k -> new PartitionChangelog())
                     .addCompactChangelogFile(bucket, meta);
@@ -111,11 +134,11 @@ public class ChangelogCompactCoordinateOperator
                         new DataIncrement(
                                 message.newFilesIncrement().newFiles(),
                                 message.newFilesIncrement().deletedFiles(),
-                                Collections.emptyList()),
+                                skippedNewChangelogs),
                         new CompactIncrement(
                                 message.compactIncrement().compactBefore(),
                                 message.compactIncrement().compactAfter(),
-                                Collections.emptyList()),
+                                skippedCompactChangelogs),
                         message.indexIncrement());
         Committable newCommittable =
                 new Committable(committable.checkpointId(), 
Committable.Kind.FILE, newMessage);
@@ -132,15 +155,59 @@ public class ChangelogCompactCoordinateOperator
 
     private void emitPartitionChangelogCompactTask(BinaryRow partition) {
         PartitionChangelog partitionChangelog = 
partitionChangelogs.get(partition);
-        output.collect(
-                new StreamRecord<>(
-                        Either.Right(
-                                new ChangelogCompactTask(
-                                        checkpointId,
-                                        partition,
-                                        table.coreOptions().bucket(),
-                                        
partitionChangelog.newFileChangelogFiles,
-                                        
partitionChangelog.compactChangelogFiles))));
+        int numNewChangelogFiles =
+                partitionChangelog.newFileChangelogFiles.values().stream()
+                        .mapToInt(List::size)
+                        .sum();
+        int numCompactChangelogFiles =
+                partitionChangelog.compactChangelogFiles.values().stream()
+                        .mapToInt(List::size)
+                        .sum();
+        if (numNewChangelogFiles + numCompactChangelogFiles == 1) {
+            // there is only one changelog file in this partition, so we don't 
wrap it as a
+            // compaction task
+            CommitMessageImpl message;
+            if (numNewChangelogFiles == 1) {
+                Map.Entry<Integer, List<DataFileMeta>> entry =
+                        
partitionChangelog.newFileChangelogFiles.entrySet().iterator().next();
+                message =
+                        new CommitMessageImpl(
+                                partition,
+                                entry.getKey(),
+                                options.bucket(),
+                                new DataIncrement(
+                                        Collections.emptyList(),
+                                        Collections.emptyList(),
+                                        entry.getValue()),
+                                CompactIncrement.emptyIncrement());
+            } else {
+                Map.Entry<Integer, List<DataFileMeta>> entry =
+                        
partitionChangelog.compactChangelogFiles.entrySet().iterator().next();
+                message =
+                        new CommitMessageImpl(
+                                partition,
+                                entry.getKey(),
+                                options.bucket(),
+                                DataIncrement.emptyIncrement(),
+                                new CompactIncrement(
+                                        Collections.emptyList(),
+                                        Collections.emptyList(),
+                                        entry.getValue()));
+            }
+            Committable newCommittable =
+                    new Committable(checkpointId, Committable.Kind.FILE, 
message);
+            output.collect(new StreamRecord<>(Either.Left(newCommittable)));
+        } else {
+            output.collect(
+                    new StreamRecord<>(
+                            Either.Right(
+                                    new ChangelogCompactTask(
+                                            checkpointId,
+                                            partition,
+                                            options.bucket(),
+                                            
partitionChangelog.newFileChangelogFiles,
+                                            
partitionChangelog.compactChangelogFiles))));
+        }
         partitionChangelogs.remove(partition);
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTask.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTask.java
index 96fe15c344..7605af61c4 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTask.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTask.java
@@ -23,19 +23,24 @@ import 
org.apache.paimon.flink.compact.changelog.format.CompactedChangelogReadOn
 import org.apache.paimon.flink.sink.Committable;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.PositionOutputStream;
-import org.apache.paimon.fs.SeekableInputStream;
 import org.apache.paimon.io.CompactIncrement;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.DataFilePathFactory;
 import org.apache.paimon.io.DataIncrement;
+import org.apache.paimon.options.MemorySize;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.sink.CommitMessageImpl;
 import org.apache.paimon.utils.FileStorePathFactory;
 import org.apache.paimon.utils.IOUtils;
 import org.apache.paimon.utils.Preconditions;
+import org.apache.paimon.utils.ThreadPoolUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.Serializable;
+import java.io.UncheckedIOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -43,6 +48,12 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Semaphore;
+import java.util.function.BiConsumer;
 
 /**
  * {@link ChangelogCompactTask} to compact several changelog files from the 
same partition into one
@@ -50,6 +61,8 @@ import java.util.UUID;
  */
 public class ChangelogCompactTask implements Serializable {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(ChangelogCompactTask.class);
+
     private final long checkpointId;
     private final BinaryRow partition;
     private final int totalBuckets;
@@ -67,6 +80,11 @@ public class ChangelogCompactTask implements Serializable {
         this.totalBuckets = totalBuckets;
         this.newFileChangelogFiles = newFileChangelogFiles;
         this.compactChangelogFiles = compactChangelogFiles;
+        Preconditions.checkArgument(
+                newFileChangelogFiles.isEmpty() || 
compactChangelogFiles.isEmpty(),
+                "Both newFileChangelogFiles and compactChangelogFiles are not 
empty. "
+                        + "There is no such table where changelog is produced 
both from new files and from compaction. "
+                        + "This is unexpected.");
     }
 
     public long checkpointId() {
@@ -89,69 +107,102 @@ public class ChangelogCompactTask implements Serializable 
{
         return compactChangelogFiles;
     }
 
-    public List<Committable> doCompact(FileStoreTable table) throws Exception {
+    public List<Committable> doCompact(
+            FileStoreTable table, ExecutorService executor, MemorySize 
bufferSize)
+            throws Exception {
+        Preconditions.checkArgument(
+                bufferSize.getBytes() <= Integer.MAX_VALUE,
+                "Changelog pre-commit compaction buffer size ({} bytes) too 
large! "
+                        + "The maximum possible value is {} bytes.",
+                bufferSize.getBytes(),
+                Integer.MAX_VALUE);
+
         FileStorePathFactory pathFactory = table.store().pathFactory();
+        List<ReadTask> tasks = new ArrayList<>();
+        BiConsumer<Map<Integer, List<DataFileMeta>>, Boolean> addTasks =
+                (files, isCompactResult) -> {
+                    for (Map.Entry<Integer, List<DataFileMeta>> entry : 
files.entrySet()) {
+                        int bucket = entry.getKey();
+                        DataFilePathFactory dataFilePathFactory =
+                                
pathFactory.createDataFilePathFactory(partition, bucket);
+                        for (DataFileMeta meta : entry.getValue()) {
+                            ReadTask task =
+                                    new ReadTask(
+                                            table,
+                                            dataFilePathFactory.toPath(meta),
+                                            bucket,
+                                            isCompactResult,
+                                            meta);
+                            Preconditions.checkArgument(
+                                    meta.fileSize() <= bufferSize.getBytes(),
+                                    "Trying to compact changelog file with 
size {} bytes, "
+                                            + "while the buffer size is only 
{} bytes. This is unexpected.",
+                                    meta.fileSize(),
+                                    bufferSize.getBytes());
+                            tasks.add(task);
+                        }
+                    }
+                };
+        addTasks.accept(newFileChangelogFiles, false);
+        addTasks.accept(compactChangelogFiles, true);
+
+        Semaphore semaphore = new Semaphore((int) bufferSize.getBytes());
+        BlockingQueue<ReadTask> finishedTasks = new LinkedBlockingQueue<>();
+        List<Future<?>> futures =
+                ThreadPoolUtils.submitAllTasks(
+                        executor,
+                        t -> {
+                            // Why not create `finishedTasks` as a blocking 
queue and use it to
+                            // limit the total size of bytes awaiting to be 
copied? Because finished
+                            // tasks are added after their contents are read, 
so even if
+                            // `finishedTasks` is full, each thread can still 
read one more file,
+                            // and the limit will become `bytesInThreads + 
bufferSize`, not just
+                            // `bufferSize`.
+                            try {
+                                semaphore.acquire((int) t.meta.fileSize());
+                                t.readFully();
+                                finishedTasks.put(t);
+                            } catch (InterruptedException e) {
+                                Thread.currentThread().interrupt();
+                                throw new RuntimeException(e);
+                            }
+                        },
+                        tasks);
+
         OutputStream outputStream = new OutputStream();
         List<Result> results = new ArrayList<>();
-
-        // copy all changelog files to a new big file
-        for (Map.Entry<Integer, List<DataFileMeta>> entry : 
newFileChangelogFiles.entrySet()) {
-            int bucket = entry.getKey();
-            DataFilePathFactory dataFilePathFactory =
-                    pathFactory.createDataFilePathFactory(partition, bucket);
-            for (DataFileMeta meta : entry.getValue()) {
-                copyFile(
-                        outputStream,
-                        results,
-                        table,
-                        dataFilePathFactory.toPath(meta),
-                        bucket,
-                        false,
-                        meta);
-            }
-        }
-        for (Map.Entry<Integer, List<DataFileMeta>> entry : 
compactChangelogFiles.entrySet()) {
-            Integer bucket = entry.getKey();
-            DataFilePathFactory dataFilePathFactory =
-                    pathFactory.createDataFilePathFactory(partition, bucket);
-            for (DataFileMeta meta : entry.getValue()) {
-                copyFile(
-                        outputStream,
-                        results,
-                        table,
-                        dataFilePathFactory.toPath(meta),
-                        bucket,
-                        true,
-                        meta);
-            }
+        for (int i = 0; i < tasks.size(); i++) {
+            // copy all files into a new big file
+            ReadTask task = finishedTasks.take();
+            write(task, outputStream, results);
+            semaphore.release((int) task.meta.fileSize());
         }
         outputStream.out.close();
+        ThreadPoolUtils.awaitAllFutures(futures);
 
         return produceNewCommittables(results, table, pathFactory, 
outputStream.path);
     }
 
-    private void copyFile(
-            OutputStream outputStream,
-            List<Result> results,
-            FileStoreTable table,
-            Path path,
-            int bucket,
-            boolean isCompactResult,
-            DataFileMeta meta)
+    private void write(ReadTask task, OutputStream outputStream, List<Result> 
results)
             throws Exception {
         if (!outputStream.isInitialized) {
             Path outputPath =
-                    new Path(path.getParent(), "tmp-compacted-changelog-" + 
UUID.randomUUID());
-            outputStream.init(outputPath, 
table.fileIO().newOutputStream(outputPath, false));
+                    new Path(task.path.getParent(), "tmp-compacted-changelog-" 
+ UUID.randomUUID());
+            outputStream.init(outputPath, 
task.table.fileIO().newOutputStream(outputPath, false));
         }
-        long offset = outputStream.out.getPos();
-        try (SeekableInputStream in = table.fileIO().newInputStream(path)) {
-            IOUtils.copyBytes(in, outputStream.out, IOUtils.BLOCKSIZE, false);
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Copying bytes from {} to {}", task.path, 
outputStream.path);
         }
-        table.fileIO().deleteQuietly(path);
+        long offset = outputStream.out.getPos();
+        outputStream.out.write(task.result);
         results.add(
                 new Result(
-                        bucket, isCompactResult, meta, offset, 
outputStream.out.getPos() - offset));
+                        task.bucket,
+                        task.isCompactResult,
+                        task.meta,
+                        offset,
+                        outputStream.out.getPos() - offset));
     }
 
     private List<Committable> produceNewCommittables(
@@ -172,23 +223,23 @@ public class ChangelogCompactTask implements Serializable 
{
                         + baseResult.bucket
                         + "-"
                         + baseResult.length;
-        table.fileIO()
-                .rename(
-                        changelogTempPath,
-                        dataFilePathFactory.toAlignedPath(
-                                realName
-                                        + "."
-                                        + 
CompactedChangelogReadOnlyFormat.getIdentifier(
-                                                baseResult.meta.fileFormat()),
-                                baseResult.meta));
-
-        List<Committable> newCommittables = new ArrayList<>();
+        Path realPath =
+                dataFilePathFactory.toAlignedPath(
+                        realName
+                                + "."
+                                + 
CompactedChangelogReadOnlyFormat.getIdentifier(
+                                        baseResult.meta.fileFormat()),
+                        baseResult.meta);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Rename {} to {}", changelogTempPath, realPath);
+        }
+        table.fileIO().rename(changelogTempPath, realPath);
 
         Map<Integer, List<Result>> bucketedResults = new HashMap<>();
         for (Result result : results) {
             bucketedResults.computeIfAbsent(result.bucket, b -> new 
ArrayList<>()).add(result);
         }
-
+        List<Committable> newCommittables = new ArrayList<>();
         for (Map.Entry<Integer, List<Result>> entry : 
bucketedResults.entrySet()) {
             List<DataFileMeta> newFilesChangelog = new ArrayList<>();
             List<DataFileMeta> compactChangelog = new ArrayList<>();
@@ -256,6 +307,39 @@ public class ChangelogCompactTask implements Serializable {
                 partition, newFileChangelogFiles, compactChangelogFiles);
     }
 
+    private static class ReadTask {
+
+        private final FileStoreTable table;
+        private final Path path;
+        private final int bucket;
+        private final boolean isCompactResult;
+        private final DataFileMeta meta;
+
+        private byte[] result = null;
+
+        private ReadTask(
+                FileStoreTable table,
+                Path path,
+                int bucket,
+                boolean isCompactResult,
+                DataFileMeta meta) {
+            this.table = table;
+            this.path = path;
+            this.bucket = bucket;
+            this.isCompactResult = isCompactResult;
+            this.meta = meta;
+        }
+
+        private void readFully() {
+            try {
+                result = 
IOUtils.readFully(table.fileIO().newInputStream(path), true);
+                table.fileIO().deleteQuietly(path);
+            } catch (IOException e) {
+                throw new UncheckedIOException(e);
+            }
+        }
+    }
+
     private static class OutputStream {
 
         private Path path;
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactWorkerOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactWorkerOperator.java
index 260c25a315..9d5fb46e3e 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactWorkerOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactWorkerOperator.java
@@ -18,8 +18,12 @@
 
 package org.apache.paimon.flink.compact.changelog;
 
+import org.apache.paimon.flink.FlinkConnectorOptions;
 import org.apache.paimon.flink.sink.Committable;
+import org.apache.paimon.options.MemorySize;
+import org.apache.paimon.options.Options;
 import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.utils.ThreadPoolUtils;
 
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
@@ -27,6 +31,7 @@ import 
org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.types.Either;
 
 import java.util.List;
+import java.util.concurrent.ExecutorService;
 
 /**
  * Receive and process the {@link ChangelogCompactTask}s emitted by {@link
@@ -34,20 +39,40 @@ import java.util.List;
  */
 public class ChangelogCompactWorkerOperator extends 
AbstractStreamOperator<Committable>
         implements OneInputStreamOperator<Either<Committable, 
ChangelogCompactTask>, Committable> {
+
     private final FileStoreTable table;
 
+    private transient ExecutorService executor;
+    private transient MemorySize bufferSize;
+
     public ChangelogCompactWorkerOperator(FileStoreTable table) {
         this.table = table;
     }
 
+    @Override
+    public void open() throws Exception {
+        Options options = new Options(table.options());
+        int numThreads =
+                
options.getOptional(FlinkConnectorOptions.CHANGELOG_PRECOMMIT_COMPACT_THREAD_NUM)
+                        .orElse(Runtime.getRuntime().availableProcessors());
+        executor =
+                ThreadPoolUtils.createCachedThreadPool(
+                        numThreads, "changelog-compact-async-read-bytes");
+        bufferSize = 
options.get(FlinkConnectorOptions.CHANGELOG_PRECOMMIT_COMPACT_BUFFER_SIZE);
+        LOG.info(
+                "Creating {} threads and a buffer of {} bytes for changelog 
compaction.",
+                numThreads,
+                bufferSize.getBytes());
+    }
+
+    @Override
     public void processElement(StreamRecord<Either<Committable, 
ChangelogCompactTask>> record)
             throws Exception {
-
         if (record.getValue().isLeft()) {
             output.collect(new StreamRecord<>(record.getValue().left()));
         } else {
             ChangelogCompactTask task = record.getValue().right();
-            List<Committable> committables = task.doCompact(table);
+            List<Committable> committables = task.doCompact(table, executor, 
bufferSize);
             committables.forEach(committable -> output.collect(new 
StreamRecord<>(committable)));
         }
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
index 9c5bf11221..d99dd255e4 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
@@ -248,7 +248,7 @@ public abstract class FlinkSink<T> implements Serializable {
                                     "Changelog Compact Coordinator",
                                     new EitherTypeInfo<>(
                                             new CommittableTypeInfo(), new 
ChangelogTaskTypeInfo()),
-                                    new 
ChangelogCompactCoordinateOperator(table))
+                                    new 
ChangelogCompactCoordinateOperator(table.coreOptions()))
                             .forceNonParallel()
                             .transform(
                                     "Changelog Compact Worker",
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
index c701cefb30..d1ed5dbc84 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
@@ -679,7 +679,7 @@ public class PrimaryKeyFileStoreTableITCase extends 
AbstractTestBase {
                         + "WITH ("
                         + "    'bucket' = '10',\n"
                         + "    'changelog-producer' = 'lookup',\n"
-                        + "    'changelog.precommit-compact' = 'true',\n"
+                        + "    'precommit-compact' = 'true',\n"
                         + "    'snapshot.num-retained.min' = '3',\n"
                         + "    'snapshot.num-retained.max' = '3'\n"
                         + ")");
@@ -773,7 +773,7 @@ public class PrimaryKeyFileStoreTableITCase extends 
AbstractTestBase {
                         + "WITH ("
                         + "    'bucket' = '10',\n"
                         + "    'changelog-producer' = 'lookup',\n"
-                        + "    'changelog.precommit-compact' = 'true'\n"
+                        + "    'precommit-compact' = 'true'\n"
                         + ")");
 
         Path inputPath = new Path(path, "input");
@@ -1014,7 +1014,7 @@ public class PrimaryKeyFileStoreTableITCase extends 
AbstractTestBase {
                                 + "'changelog-producer' = 'lookup', "
                                 + "'lookup-wait' = '%s', "
                                 + "'deletion-vectors.enabled' = '%s', "
-                                + "'changelog.precommit-compact' = '%s'",
+                                + "'precommit-compact' = '%s'",
                         random.nextBoolean() ? "4mb" : "8mb",
                         random.nextBoolean(),
                         enableDeletionVectors,
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactCoordinateOperatorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactCoordinateOperatorTest.java
new file mode 100644
index 0000000000..4d5e9ac1b9
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactCoordinateOperatorTest.java
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.compact.changelog;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryRowWriter;
+import org.apache.paimon.flink.sink.Committable;
+import org.apache.paimon.flink.sink.CommittableTypeInfo;
+import org.apache.paimon.io.CompactIncrement;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.DataIncrement;
+import org.apache.paimon.options.MemorySize;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.stats.SimpleStats;
+import org.apache.paimon.table.sink.CommitMessageImpl;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.runtime.EitherSerializer;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.types.Either;
+import org.apache.flink.util.Preconditions;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link ChangelogCompactCoordinateOperator}. */
+public class ChangelogCompactCoordinateOperatorTest {
+
+    @Test
+    public void testPrepareSnapshotWithMultipleFiles() throws Exception {
+        Options options = new Options();
+        options.set(CoreOptions.TARGET_FILE_SIZE, MemorySize.ofMebiBytes(8));
+        ChangelogCompactCoordinateOperator operator =
+                new ChangelogCompactCoordinateOperator(new 
CoreOptions(options));
+        OneInputStreamOperatorTestHarness<Committable, Either<Committable, 
ChangelogCompactTask>>
+                testHarness = createTestHarness(operator);
+
+        testHarness.open();
+        testHarness.processElement(
+                new StreamRecord<>(
+                        createCommittable(
+                                1,
+                                BinaryRow.EMPTY_ROW,
+                                0,
+                                Collections.emptyList(),
+                                Arrays.asList(3, 2, 5, 4))));
+        testHarness.processElement(
+                new StreamRecord<>(
+                        createCommittable(
+                                1,
+                                BinaryRow.EMPTY_ROW,
+                                1,
+                                Collections.emptyList(),
+                                Arrays.asList(3, 3, 2, 2))));
+        testHarness.prepareSnapshotPreBarrier(1);
+        testHarness.processElement(
+                new StreamRecord<>(
+                        createCommittable(
+                                2,
+                                BinaryRow.EMPTY_ROW,
+                                0,
+                                Collections.emptyList(),
+                                Arrays.asList(2, 3))));
+        testHarness.prepareSnapshotPreBarrier(2);
+
+        List<Object> output = new ArrayList<>(testHarness.getOutput());
+        assertThat(output).hasSize(7);
+
+        Map<Integer, List<Integer>> expected = new HashMap<>();
+        expected.put(0, Arrays.asList(3, 2, 5));
+        assertCompactionTask(output.get(0), 1, BinaryRow.EMPTY_ROW, new 
HashMap<>(), expected);
+
+        expected.clear();
+        expected.put(0, Collections.singletonList(4));
+        expected.put(1, Arrays.asList(3, 3));
+        assertCompactionTask(output.get(2), 1, BinaryRow.EMPTY_ROW, new 
HashMap<>(), expected);
+
+        expected.clear();
+        expected.put(1, Arrays.asList(2, 2));
+        assertCompactionTask(output.get(4), 1, BinaryRow.EMPTY_ROW, new 
HashMap<>(), expected);
+
+        expected.clear();
+        expected.put(0, Arrays.asList(2, 3));
+        assertCompactionTask(output.get(6), 2, BinaryRow.EMPTY_ROW, new 
HashMap<>(), expected);
+
+        testHarness.close();
+    }
+
+    @Test
+    public void testPrepareSnapshotWithSingleFile() throws Exception {
+        Options options = new Options();
+        options.set(CoreOptions.TARGET_FILE_SIZE, MemorySize.ofMebiBytes(8));
+        ChangelogCompactCoordinateOperator operator =
+                new ChangelogCompactCoordinateOperator(new 
CoreOptions(options));
+        OneInputStreamOperatorTestHarness<Committable, Either<Committable, 
ChangelogCompactTask>>
+                testHarness = createTestHarness(operator);
+
+        testHarness.open();
+        testHarness.processElement(
+                new StreamRecord<>(
+                        createCommittable(
+                                1,
+                                BinaryRow.EMPTY_ROW,
+                                0,
+                                Arrays.asList(3, 5, 2),
+                                Collections.emptyList())));
+        testHarness.prepareSnapshotPreBarrier(1);
+
+        List<Object> output = new ArrayList<>(testHarness.getOutput());
+        assertThat(output).hasSize(3);
+
+        Map<Integer, List<Integer>> expected = new HashMap<>();
+        expected.put(0, Arrays.asList(3, 5));
+        assertCompactionTask(output.get(0), 1, BinaryRow.EMPTY_ROW, expected, 
new HashMap<>());
+        assertCommittable(
+                output.get(2),
+                BinaryRow.EMPTY_ROW,
+                Collections.singletonList(2),
+                Collections.emptyList());
+
+        testHarness.close();
+    }
+
+    @Test
+    public void testPrepareSnapshotWithMultiplePartitions() throws Exception {
+        Options options = new Options();
+        options.set(CoreOptions.TARGET_FILE_SIZE, MemorySize.ofMebiBytes(8));
+        ChangelogCompactCoordinateOperator operator =
+                new ChangelogCompactCoordinateOperator(new 
CoreOptions(options));
+        OneInputStreamOperatorTestHarness<Committable, Either<Committable, 
ChangelogCompactTask>>
+                testHarness = createTestHarness(operator);
+
+        Function<Integer, BinaryRow> binaryRow =
+                i -> {
+                    BinaryRow row = new BinaryRow(1);
+                    BinaryRowWriter writer = new BinaryRowWriter(row);
+                    writer.writeInt(0, i);
+                    writer.complete();
+                    return row;
+                };
+
+        testHarness.open();
+        testHarness.processElement(
+                new StreamRecord<>(
+                        createCommittable(
+                                1,
+                                binaryRow.apply(1),
+                                0,
+                                Collections.emptyList(),
+                                Arrays.asList(3, 2, 5, 4))));
+        testHarness.processElement(
+                new StreamRecord<>(
+                        createCommittable(
+                                1,
+                                binaryRow.apply(2),
+                                1,
+                                Collections.emptyList(),
+                                Arrays.asList(3, 3, 2, 2, 3))));
+        testHarness.prepareSnapshotPreBarrier(1);
+        testHarness.processElement(
+                new StreamRecord<>(
+                        createCommittable(
+                                2,
+                                binaryRow.apply(1),
+                                0,
+                                Collections.emptyList(),
+                                Arrays.asList(2, 3))));
+        testHarness.prepareSnapshotPreBarrier(2);
+
+        List<Object> output = new ArrayList<>(testHarness.getOutput());
+        assertThat(output).hasSize(8);
+
+        Map<Integer, List<Integer>> expected = new HashMap<>();
+        expected.put(0, Arrays.asList(3, 2, 5));
+        assertCompactionTask(output.get(0), 1, binaryRow.apply(1), new 
HashMap<>(), expected);
+
+        expected.clear();
+        expected.put(1, Arrays.asList(3, 3, 2));
+        assertCompactionTask(output.get(2), 1, binaryRow.apply(2), new 
HashMap<>(), expected);
+
+        assertCommittable(
+                output.get(4),
+                binaryRow.apply(1),
+                Collections.emptyList(),
+                Collections.singletonList(4));
+
+        expected.clear();
+        expected.put(1, Arrays.asList(2, 3));
+        assertCompactionTask(output.get(5), 1, binaryRow.apply(2), new 
HashMap<>(), expected);
+
+        expected.clear();
+        expected.put(0, Arrays.asList(2, 3));
+        assertCompactionTask(output.get(7), 2, binaryRow.apply(1), new 
HashMap<>(), expected);
+
+        testHarness.close();
+    }
+
+    @Test
+    public void testSkipLargeFiles() throws Exception {
+        Options options = new Options();
+        options.set(CoreOptions.TARGET_FILE_SIZE, MemorySize.ofMebiBytes(8));
+        ChangelogCompactCoordinateOperator operator =
+                new ChangelogCompactCoordinateOperator(new 
CoreOptions(options));
+        OneInputStreamOperatorTestHarness<Committable, Either<Committable, 
ChangelogCompactTask>>
+                testHarness = createTestHarness(operator);
+
+        testHarness.open();
+        testHarness.processElement(
+                new StreamRecord<>(
+                        createCommittable(
+                                1,
+                                BinaryRow.EMPTY_ROW,
+                                0,
+                                Collections.emptyList(),
+                                Arrays.asList(3, 10, 5, 9))));
+        testHarness.prepareSnapshotPreBarrier(1);
+
+        List<Object> output = new ArrayList<>(testHarness.getOutput());
+        assertThat(output).hasSize(2);
+
+        Map<Integer, List<Integer>> expected = new HashMap<>();
+        expected.put(0, Arrays.asList(3, 5));
+        assertCompactionTask(output.get(0), 1, BinaryRow.EMPTY_ROW, new 
HashMap<>(), expected);
+        assertCommittable(
+                output.get(1), BinaryRow.EMPTY_ROW, Collections.emptyList(), 
Arrays.asList(10, 9));
+
+        testHarness.close();
+    }
+
+    @SuppressWarnings("unchecked")
+    private void assertCommittable(
+            Object o,
+            BinaryRow partition,
+            List<Integer> newFilesChangelogMbs,
+            List<Integer> compactChangelogMbs) {
+        StreamRecord<Either<Committable, ChangelogCompactTask>> record =
+                (StreamRecord<Either<Committable, ChangelogCompactTask>>) o;
+        assertThat(record.getValue().isLeft()).isTrue();
+        Committable committable = record.getValue().left();
+
+        assertThat(committable.checkpointId()).isEqualTo(1);
+        CommitMessageImpl message = (CommitMessageImpl) 
committable.wrappedCommittable();
+        assertThat(message.partition()).isEqualTo(partition);
+        assertThat(message.bucket()).isEqualTo(0);
+
+        assertSameSizes(message.newFilesIncrement().changelogFiles(), 
newFilesChangelogMbs);
+        assertSameSizes(message.compactIncrement().changelogFiles(), 
compactChangelogMbs);
+    }
+
+    @SuppressWarnings("unchecked")
+    private void assertCompactionTask(
+            Object o,
+            long checkpointId,
+            BinaryRow partition,
+            Map<Integer, List<Integer>> newFilesChangelogMbs,
+            Map<Integer, List<Integer>> compactChangelogMbs) {
+        StreamRecord<Either<Committable, ChangelogCompactTask>> record =
+                (StreamRecord<Either<Committable, ChangelogCompactTask>>) o;
+        assertThat(record.getValue().isRight()).isTrue();
+        ChangelogCompactTask task = record.getValue().right();
+
+        assertThat(task.checkpointId()).isEqualTo(checkpointId);
+        assertThat(task.partition()).isEqualTo(partition);
+
+        
assertThat(task.newFileChangelogFiles().keySet()).isEqualTo(newFilesChangelogMbs.keySet());
+        for (int bucket : task.newFileChangelogFiles().keySet()) {
+            assertSameSizes(
+                    task.newFileChangelogFiles().get(bucket), 
newFilesChangelogMbs.get(bucket));
+        }
+        
assertThat(task.compactChangelogFiles().keySet()).isEqualTo(compactChangelogMbs.keySet());
+        for (int bucket : task.compactChangelogFiles().keySet()) {
+            assertSameSizes(
+                    task.compactChangelogFiles().get(bucket), 
compactChangelogMbs.get(bucket));
+        }
+    }
+
+    private void assertSameSizes(List<DataFileMeta> metas, List<Integer> mbs) {
+        assertThat(metas.stream().mapToLong(DataFileMeta::fileSize).toArray())
+                .containsExactlyInAnyOrder(
+                        mbs.stream()
+                                .mapToLong(mb -> 
MemorySize.ofMebiBytes(mb).getBytes())
+                                .toArray());
+    }
+
+    private Committable createCommittable(
+            long checkpointId,
+            BinaryRow partition,
+            int bucket,
+            List<Integer> newFilesChangelogMbs,
+            List<Integer> compactChangelogMbs) {
+        CommitMessageImpl message =
+                new CommitMessageImpl(
+                        partition,
+                        bucket,
+                        2,
+                        new DataIncrement(
+                                Collections.emptyList(),
+                                Collections.emptyList(),
+                                newFilesChangelogMbs.stream()
+                                        .map(this::createDataFileMetaOfSize)
+                                        .collect(Collectors.toList())),
+                        new CompactIncrement(
+                                Collections.emptyList(),
+                                Collections.emptyList(),
+                                compactChangelogMbs.stream()
+                                        .map(this::createDataFileMetaOfSize)
+                                        .collect(Collectors.toList())));
+        return new Committable(checkpointId, Committable.Kind.FILE, message);
+    }
+
+    private DataFileMeta createDataFileMetaOfSize(int mb) {
+        return DataFileMeta.forAppend(
+                UUID.randomUUID().toString(),
+                MemorySize.ofMebiBytes(mb).getBytes(),
+                0,
+                SimpleStats.EMPTY_STATS,
+                0,
+                0,
+                1,
+                Collections.emptyList(),
+                null,
+                null,
+                null,
+                null);
+    }
+
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    private OneInputStreamOperatorTestHarness<
+                    Committable, Either<Committable, ChangelogCompactTask>>
+            createTestHarness(ChangelogCompactCoordinateOperator operator) 
throws Exception {
+        TypeSerializer serializer =
+                new EitherSerializer<>(
+                        new CommittableTypeInfo().createSerializer(new 
ExecutionConfig()),
+                        new ChangelogTaskTypeInfo().createSerializer(new 
ExecutionConfig()));
+        OneInputStreamOperatorTestHarness harness =
+                new OneInputStreamOperatorTestHarness(operator, 1, 1, 0);
+        
harness.getStreamConfig().setupNetworkInputs(Preconditions.checkNotNull(serializer));
+        harness.getStreamConfig().serializeAllConfigs();
+        harness.setup(serializer);
+        return harness;
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTaskSerializerTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTaskSerializerTest.java
index 7fcde6214f..580220e77d 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTaskSerializerTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTaskSerializerTest.java
@@ -56,13 +56,24 @@ public class ChangelogCompactTaskSerializerTest {
                                 put(1, newFiles(20));
                             }
                         },
+                        new HashMap<>());
+        ChangelogCompactTask serializeTask =
+                serializer.deserialize(serializer.getVersion(), 
serializer.serialize(task));
+        assertThat(task).isEqualTo(serializeTask);
+
+        task =
+                new ChangelogCompactTask(
+                        2L,
+                        partition,
+                        2,
+                        new HashMap<>(),
                         new HashMap<Integer, List<DataFileMeta>>() {
                             {
                                 put(0, newFiles(10));
                                 put(1, newFiles(10));
                             }
                         });
-        ChangelogCompactTask serializeTask = serializer.deserialize(2, 
serializer.serialize(task));
+        serializeTask = serializer.deserialize(2, serializer.serialize(task));
         assertThat(task).isEqualTo(serializeTask);
     }
 

Reply via email to