This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new e732f2e7ee [flink] Fix deadlock and incorrect order in precommit 
changelog compaction (#5797)
e732f2e7ee is described below

commit e732f2e7ee683558d885866cd39d332b304cec34
Author: tsreaper <[email protected]>
AuthorDate: Thu Jun 26 17:44:11 2025 +0800

    [flink] Fix deadlock and incorrect order in precommit changelog compaction 
(#5797)
---
 .../org/apache/paimon/compact/CompactTask.java     |   2 +-
 .../ChangelogCompactCoordinateOperator.java        |  13 +-
 .../changelog/ChangelogCompactSortOperator.java    | 175 +++++++++++++++++
 .../compact/changelog/ChangelogCompactTask.java    |  49 +++--
 .../org/apache/paimon/flink/sink/FlinkSink.java    |  14 +-
 .../flink/PrimaryKeyFileStoreTableITCase.java      |   6 +-
 .../ChangelogCompactSortOperatorTest.java          | 216 +++++++++++++++++++++
 .../changelog/ChangelogCompactTaskTest.java        |  98 ++++++++++
 8 files changed, 540 insertions(+), 33 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/compact/CompactTask.java 
b/paimon-core/src/main/java/org/apache/paimon/compact/CompactTask.java
index c8da0f3ef2..69e68949c3 100644
--- a/paimon-core/src/main/java/org/apache/paimon/compact/CompactTask.java
+++ b/paimon-core/src/main/java/org/apache/paimon/compact/CompactTask.java
@@ -69,7 +69,7 @@ public abstract class CompactTask implements 
Callable<CompactResult> {
                     LOG);
 
             if (LOG.isDebugEnabled()) {
-                logMetric(startMillis, result.before(), result.after());
+                LOG.debug(logMetric(startMillis, result.before(), 
result.after()));
             }
             return result;
         } finally {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactCoordinateOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactCoordinateOperator.java
index 34553ebe5f..0190279a8b 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactCoordinateOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactCoordinateOperator.java
@@ -56,6 +56,7 @@ public class ChangelogCompactCoordinateOperator
 
     private transient long checkpointId;
     private transient Map<BinaryRow, PartitionChangelog> partitionChangelogs;
+    private transient Map<BinaryRow, Integer> numBuckets;
 
     public ChangelogCompactCoordinateOperator(CoreOptions options) {
         this.options = options;
@@ -67,6 +68,7 @@ public class ChangelogCompactCoordinateOperator
 
         checkpointId = Long.MIN_VALUE;
         partitionChangelogs = new LinkedHashMap<>();
+        numBuckets = new LinkedHashMap<>();
     }
 
     public void processElement(StreamRecord<Committable> record) {
@@ -84,6 +86,8 @@ public class ChangelogCompactCoordinateOperator
             return;
         }
 
+        numBuckets.put(message.partition(), message.totalBuckets());
+
         // Changelog files are not stored in an LSM tree,
         // so we can regard them as files without primary keys.
         long targetFileSize = options.targetFileSize(false);
@@ -145,10 +149,12 @@ public class ChangelogCompactCoordinateOperator
         output.collect(new StreamRecord<>(Either.Left(newCommittable)));
     }
 
+    @Override
     public void prepareSnapshotPreBarrier(long checkpointId) {
         emitAllPartitionsChangelogCompactTask();
     }
 
+    @Override
     public void endInput() {
         emitAllPartitionsChangelogCompactTask();
     }
@@ -174,7 +180,7 @@ public class ChangelogCompactCoordinateOperator
                         new CommitMessageImpl(
                                 partition,
                                 entry.getKey(),
-                                options.bucket(),
+                                numBuckets.get(partition),
                                 new DataIncrement(
                                         Collections.emptyList(),
                                         Collections.emptyList(),
@@ -187,7 +193,7 @@ public class ChangelogCompactCoordinateOperator
                         new CommitMessageImpl(
                                 partition,
                                 entry.getKey(),
-                                options.bucket(),
+                                numBuckets.get(partition),
                                 DataIncrement.emptyIncrement(),
                                 new CompactIncrement(
                                         Collections.emptyList(),
@@ -204,7 +210,7 @@ public class ChangelogCompactCoordinateOperator
                                     new ChangelogCompactTask(
                                             checkpointId,
                                             partition,
-                                            options.bucket(),
+                                            numBuckets.get(partition),
                                             
partitionChangelog.newFileChangelogFiles,
                                             
partitionChangelog.compactChangelogFiles))));
         }
@@ -216,6 +222,7 @@ public class ChangelogCompactCoordinateOperator
         for (BinaryRow partition : partitions) {
             emitPartitionChangelogCompactTask(partition);
         }
+        numBuckets.clear();
     }
 
     private static class PartitionChangelog {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactSortOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactSortOperator.java
new file mode 100644
index 0000000000..92e0110f27
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactSortOperator.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.compact.changelog;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.flink.sink.Committable;
+import org.apache.paimon.io.CompactIncrement;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.DataIncrement;
+import org.apache.paimon.io.IndexIncrement;
+import org.apache.paimon.table.sink.CommitMessageImpl;
+
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.function.BiConsumer;
+
+/** Operator to sort changelog files from each bucket by their creation time. 
*/
+public class ChangelogCompactSortOperator extends 
AbstractStreamOperator<Committable>
+        implements OneInputStreamOperator<Committable, Committable>, 
BoundedOneInput {
+
+    private transient Map<BinaryRow, Map<Integer, List<DataFileMeta>>> 
newFileChangelogFiles;
+    private transient Map<BinaryRow, Map<Integer, List<DataFileMeta>>> 
compactChangelogFiles;
+    private transient Map<BinaryRow, Integer> numBuckets;
+
+    @Override
+    public void open() {
+        newFileChangelogFiles = new LinkedHashMap<>();
+        compactChangelogFiles = new LinkedHashMap<>();
+        numBuckets = new LinkedHashMap<>();
+    }
+
+    @Override
+    public void processElement(StreamRecord<Committable> record) throws 
Exception {
+        Committable committable = record.getValue();
+        if (committable.kind() != Committable.Kind.FILE) {
+            output.collect(record);
+            return;
+        }
+
+        CommitMessageImpl message = (CommitMessageImpl) 
committable.wrappedCommittable();
+        if (message.newFilesIncrement().changelogFiles().isEmpty()
+                && message.compactIncrement().changelogFiles().isEmpty()) {
+            output.collect(record);
+            return;
+        }
+
+        numBuckets.put(message.partition(), message.totalBuckets());
+
+        BiConsumer<DataFileMeta, Map<BinaryRow, Map<Integer, 
List<DataFileMeta>>>> addChangelog =
+                (meta, changelogFiles) ->
+                        changelogFiles
+                                .computeIfAbsent(message.partition(), p -> new 
TreeMap<>())
+                                .computeIfAbsent(message.bucket(), b -> new 
ArrayList<>())
+                                .add(meta);
+        for (DataFileMeta meta : message.newFilesIncrement().changelogFiles()) 
{
+            addChangelog.accept(meta, newFileChangelogFiles);
+        }
+        for (DataFileMeta meta : message.compactIncrement().changelogFiles()) {
+            addChangelog.accept(meta, compactChangelogFiles);
+        }
+
+        CommitMessageImpl newMessage =
+                new CommitMessageImpl(
+                        message.partition(),
+                        message.bucket(),
+                        message.totalBuckets(),
+                        new DataIncrement(
+                                message.newFilesIncrement().newFiles(),
+                                message.newFilesIncrement().deletedFiles(),
+                                Collections.emptyList()),
+                        new CompactIncrement(
+                                message.compactIncrement().compactBefore(),
+                                message.compactIncrement().compactAfter(),
+                                Collections.emptyList()),
+                        message.indexIncrement());
+        if (!newMessage.isEmpty()) {
+            Committable newCommittable =
+                    new Committable(committable.checkpointId(), 
Committable.Kind.FILE, newMessage);
+            output.collect(new StreamRecord<>(newCommittable));
+        }
+    }
+
+    @Override
+    public void prepareSnapshotPreBarrier(long checkpointId) {
+        emitAll(checkpointId);
+    }
+
+    @Override
+    public void endInput() {
+        emitAll(Long.MAX_VALUE);
+    }
+
+    private void emitAll(long checkpointId) {
+        Map<BinaryRow, Set<Integer>> activeBuckets = new LinkedHashMap<>();
+        collectActiveBuckets(newFileChangelogFiles, activeBuckets);
+        collectActiveBuckets(compactChangelogFiles, activeBuckets);
+
+        for (Map.Entry<BinaryRow, Set<Integer>> entry : 
activeBuckets.entrySet()) {
+            BinaryRow partition = entry.getKey();
+            for (int bucket : entry.getValue()) {
+                CommitMessageImpl newMessage =
+                        new CommitMessageImpl(
+                                partition,
+                                bucket,
+                                numBuckets.get(partition),
+                                new DataIncrement(
+                                        Collections.emptyList(),
+                                        Collections.emptyList(),
+                                        
sortedChangelogs(newFileChangelogFiles, partition, bucket)),
+                                new CompactIncrement(
+                                        Collections.emptyList(),
+                                        Collections.emptyList(),
+                                        
sortedChangelogs(compactChangelogFiles, partition, bucket)),
+                                new IndexIncrement(Collections.emptyList()));
+                Committable newCommittable =
+                        new Committable(checkpointId, Committable.Kind.FILE, 
newMessage);
+                output.collect(new StreamRecord<>(newCommittable));
+            }
+        }
+
+        newFileChangelogFiles.clear();
+        compactChangelogFiles.clear();
+        numBuckets.clear();
+    }
+
+    private void collectActiveBuckets(
+            Map<BinaryRow, Map<Integer, List<DataFileMeta>>> from,
+            Map<BinaryRow, Set<Integer>> activeBuckets) {
+        for (Map.Entry<BinaryRow, Map<Integer, List<DataFileMeta>>> entry : 
from.entrySet()) {
+            activeBuckets
+                    .computeIfAbsent(entry.getKey(), k -> new TreeSet<>())
+                    .addAll(entry.getValue().keySet());
+        }
+    }
+
+    private List<DataFileMeta> sortedChangelogs(
+            Map<BinaryRow, Map<Integer, List<DataFileMeta>>> from,
+            BinaryRow partition,
+            int bucket) {
+        List<DataFileMeta> result = new ArrayList<>();
+        if (from.containsKey(partition) && 
from.get(partition).containsKey(bucket)) {
+            result.addAll(from.get(partition).get(bucket));
+        }
+        
result.sort(Comparator.comparingLong(DataFileMeta::creationTimeEpochMillis));
+        return result;
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTask.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTask.java
index 7605af61c4..86ec97ddde 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTask.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTask.java
@@ -40,7 +40,6 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.Serializable;
-import java.io.UncheckedIOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -50,7 +49,6 @@ import java.util.Objects;
 import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.Semaphore;
 import java.util.function.BiConsumer;
@@ -148,37 +146,37 @@ public class ChangelogCompactTask implements Serializable 
{
 
         Semaphore semaphore = new Semaphore((int) bufferSize.getBytes());
         BlockingQueue<ReadTask> finishedTasks = new LinkedBlockingQueue<>();
-        List<Future<?>> futures =
-                ThreadPoolUtils.submitAllTasks(
-                        executor,
-                        t -> {
-                            // Why not create `finishedTasks` as a blocking 
queue and use it to
-                            // limit the total size of bytes awaiting to be 
copied? Because finished
-                            // tasks are added after their contents are read, 
so even if
-                            // `finishedTasks` is full, each thread can still 
read one more file,
-                            // and the limit will become `bytesInThreads + 
bufferSize`, not just
-                            // `bufferSize`.
-                            try {
-                                semaphore.acquire((int) t.meta.fileSize());
-                                t.readFully();
-                                finishedTasks.put(t);
-                            } catch (InterruptedException e) {
-                                Thread.currentThread().interrupt();
-                                throw new RuntimeException(e);
-                            }
-                        },
-                        tasks);
+        ThreadPoolUtils.submitAllTasks(
+                executor,
+                t -> {
+                    // Why not create `finishedTasks` as a blocking queue and 
use it to limit the
+                    // total size of bytes awaiting to be copied? Because 
finished tasks are added
+                    // after their contents are read, so even if 
`finishedTasks` is full, each
+                    // thread can still read one more file, and the limit will 
become
+                    // `bytesInThreads + bufferSize`, not just `bufferSize`.
+                    try {
+                        semaphore.acquire((int) t.meta.fileSize());
+                        t.readFully();
+                        finishedTasks.put(t);
+                    } catch (InterruptedException e) {
+                        Thread.currentThread().interrupt();
+                        throw new RuntimeException(e);
+                    }
+                },
+                tasks);
 
         OutputStream outputStream = new OutputStream();
         List<Result> results = new ArrayList<>();
         for (int i = 0; i < tasks.size(); i++) {
             // copy all files into a new big file
             ReadTask task = finishedTasks.take();
+            if (task.exception != null) {
+                throw task.exception;
+            }
             write(task, outputStream, results);
             semaphore.release((int) task.meta.fileSize());
         }
         outputStream.out.close();
-        ThreadPoolUtils.awaitAllFutures(futures);
 
         return produceNewCommittables(results, table, pathFactory, 
outputStream.path);
     }
@@ -316,6 +314,7 @@ public class ChangelogCompactTask implements Serializable {
         private final DataFileMeta meta;
 
         private byte[] result = null;
+        private Exception exception = null;
 
         private ReadTask(
                 FileStoreTable table,
@@ -334,8 +333,8 @@ public class ChangelogCompactTask implements Serializable {
             try {
                 result = 
IOUtils.readFully(table.fileIO().newInputStream(path), true);
                 table.fileIO().deleteQuietly(path);
-            } catch (IOException e) {
-                throw new UncheckedIOException(e);
+            } catch (Exception e) {
+                exception = e;
             }
         }
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
index 3431f34cc5..6349dc3972 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
@@ -22,6 +22,7 @@ import org.apache.paimon.CoreOptions;
 import org.apache.paimon.CoreOptions.ChangelogProducer;
 import org.apache.paimon.CoreOptions.TagCreationMode;
 import 
org.apache.paimon.flink.compact.changelog.ChangelogCompactCoordinateOperator;
+import org.apache.paimon.flink.compact.changelog.ChangelogCompactSortOperator;
 import 
org.apache.paimon.flink.compact.changelog.ChangelogCompactWorkerOperator;
 import org.apache.paimon.flink.compact.changelog.ChangelogTaskTypeInfo;
 import org.apache.paimon.manifest.ManifestCommittable;
@@ -248,7 +249,7 @@ public abstract class FlinkSink<T> implements Serializable {
                 written, options.get(SINK_WRITER_CPU), 
options.get(SINK_WRITER_MEMORY));
 
         if (!table.primaryKeys().isEmpty() && options.get(PRECOMMIT_COMPACT)) {
-            SingleOutputStreamOperator<Committable> newWritten =
+            SingleOutputStreamOperator<Committable> beforeSort =
                     written.transform(
                                     "Changelog Compact Coordinator",
                                     new EitherTypeInfo<>(
@@ -259,8 +260,15 @@ public abstract class FlinkSink<T> implements Serializable 
{
                                     "Changelog Compact Worker",
                                     new CommittableTypeInfo(),
                                     new ChangelogCompactWorkerOperator(table));
-            forwardParallelism(newWritten, written);
-            written = newWritten;
+            forwardParallelism(beforeSort, written);
+
+            written =
+                    beforeSort
+                            .transform(
+                                    "Changelog Sort by Creation Time",
+                                    new CommittableTypeInfo(),
+                                    new ChangelogCompactSortOperator())
+                            .forceNonParallel();
         }
 
         return written;
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
index 5958cc632e..2c9bbaffd0 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
@@ -1263,6 +1263,10 @@ public class PrimaryKeyFileStoreTableITCase extends 
AbstractTestBase {
                         + tableProperties
                         + ")");
 
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Creating test table with properties {}", 
tableProperties);
+        }
+
         // input data must be strictly ordered
         tEnv.getConfig()
                 .getConfiguration()
@@ -1298,7 +1302,7 @@ public class PrimaryKeyFileStoreTableITCase extends 
AbstractTestBase {
                                 + "  'fields.i.kind' = 'sequence',"
                                 + "  'fields.i.start' = '0',"
                                 + "  'fields.i.end' = '"
-                                + (usefulNumRows - 1) * factor
+                                + (usefulNumRows * factor - 1)
                                 + "',"
                                 + "  'number-of-rows' = '"
                                 + usefulNumRows * factor
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactSortOperatorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactSortOperatorTest.java
new file mode 100644
index 0000000000..6b47256a5c
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactSortOperatorTest.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.compact.changelog;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryRowWriter;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.flink.sink.Committable;
+import org.apache.paimon.flink.sink.CommittableTypeInfo;
+import org.apache.paimon.io.CompactIncrement;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.DataIncrement;
+import org.apache.paimon.options.MemorySize;
+import org.apache.paimon.table.sink.CommitMessageImpl;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.util.Preconditions;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.function.Function;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link ChangelogCompactSortOperator}. */
+public class ChangelogCompactSortOperatorTest {
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testChangelogSorted() throws Exception {
+        ChangelogCompactSortOperator operator = new 
ChangelogCompactSortOperator();
+        OneInputStreamOperatorTestHarness<Committable, Committable> 
testHarness =
+                createTestHarness(operator);
+
+        Function<Integer, BinaryRow> binaryRow =
+                i -> {
+                    BinaryRow row = new BinaryRow(1);
+                    BinaryRowWriter writer = new BinaryRowWriter(row);
+                    writer.writeInt(0, i);
+                    writer.complete();
+                    return row;
+                };
+
+        testHarness.open();
+
+        List<DataFileMeta> files = new ArrayList<>();
+        for (int i = 0; i <= 10; i++) {
+            files.add(createDataFileMeta(i, i * 100));
+        }
+
+        CommitMessageImpl onlyData =
+                new CommitMessageImpl(
+                        binaryRow.apply(0),
+                        0,
+                        2,
+                        new DataIncrement(
+                                Arrays.asList(files.get(2), files.get(1)),
+                                Collections.emptyList(),
+                                Collections.emptyList()),
+                        CompactIncrement.emptyIncrement());
+        testHarness.processElement(
+                new StreamRecord<>(new Committable(1, Committable.Kind.FILE, 
onlyData)));
+
+        CommitMessageImpl onlyChangelogBucket0 =
+                new CommitMessageImpl(
+                        binaryRow.apply(0),
+                        0,
+                        2,
+                        new DataIncrement(
+                                Collections.emptyList(),
+                                Collections.emptyList(),
+                                Arrays.asList(files.get(4), files.get(3))),
+                        CompactIncrement.emptyIncrement());
+        testHarness.processElement(
+                new StreamRecord<>(
+                        new Committable(1, Committable.Kind.FILE, 
onlyChangelogBucket0)));
+
+        CommitMessageImpl onlyChangelogBucket1 =
+                new CommitMessageImpl(
+                        binaryRow.apply(0),
+                        1,
+                        2,
+                        new DataIncrement(
+                                Collections.emptyList(),
+                                Collections.emptyList(),
+                                Arrays.asList(files.get(7), files.get(8))),
+                        CompactIncrement.emptyIncrement());
+        testHarness.processElement(
+                new StreamRecord<>(
+                        new Committable(1, Committable.Kind.FILE, 
onlyChangelogBucket1)));
+
+        CommitMessageImpl mixed =
+                new CommitMessageImpl(
+                        binaryRow.apply(0),
+                        1,
+                        2,
+                        new DataIncrement(
+                                Arrays.asList(files.get(10), files.get(9)),
+                                Collections.emptyList(),
+                                Arrays.asList(files.get(6), files.get(5))),
+                        CompactIncrement.emptyIncrement());
+        testHarness.processElement(
+                new StreamRecord<>(new Committable(1, Committable.Kind.FILE, 
mixed)));
+
+        testHarness.prepareSnapshotPreBarrier(1);
+
+        List<Object> output = new ArrayList<>(testHarness.getOutput());
+        assertThat(output).hasSize(4);
+
+        List<CommitMessageImpl> actual = new ArrayList<>();
+        for (Object o : output) {
+            actual.add(
+                    (CommitMessageImpl)
+                            ((StreamRecord<Committable>) 
o).getValue().wrappedCommittable());
+        }
+
+        assertThat(actual.get(0)).isEqualTo(onlyData);
+        assertThat(actual.get(1))
+                .isEqualTo(
+                        new CommitMessageImpl(
+                                binaryRow.apply(0),
+                                1,
+                                2,
+                                new DataIncrement(
+                                        Arrays.asList(files.get(10), 
files.get(9)),
+                                        Collections.emptyList(),
+                                        Collections.emptyList()),
+                                CompactIncrement.emptyIncrement()));
+        assertThat(actual.get(2))
+                .isEqualTo(
+                        new CommitMessageImpl(
+                                binaryRow.apply(0),
+                                0,
+                                2,
+                                new DataIncrement(
+                                        Collections.emptyList(),
+                                        Collections.emptyList(),
+                                        Arrays.asList(files.get(3), 
files.get(4))),
+                                CompactIncrement.emptyIncrement()));
+        assertThat(actual.get(3))
+                .isEqualTo(
+                        new CommitMessageImpl(
+                                binaryRow.apply(0),
+                                1,
+                                2,
+                                new DataIncrement(
+                                        Collections.emptyList(),
+                                        Collections.emptyList(),
+                                        Arrays.asList(
+                                                files.get(5),
+                                                files.get(6),
+                                                files.get(7),
+                                                files.get(8))),
+                                CompactIncrement.emptyIncrement()));
+
+        testHarness.close();
+    }
+
+    private DataFileMeta createDataFileMeta(int mb, long creationMillis) {
+        return new DataFileMeta(
+                UUID.randomUUID().toString(),
+                MemorySize.ofMebiBytes(mb).getBytes(),
+                0,
+                null,
+                null,
+                null,
+                null,
+                0,
+                0,
+                0,
+                0,
+                Collections.emptyList(),
+                Timestamp.fromEpochMillis(creationMillis),
+                null,
+                null,
+                null,
+                null,
+                null);
+    }
+
+    @SuppressWarnings({"rawtypes", "unchecked"})
+    private OneInputStreamOperatorTestHarness<Committable, Committable> 
createTestHarness(
+            ChangelogCompactSortOperator operator) throws Exception {
+        TypeSerializer serializer =
+                new CommittableTypeInfo().createSerializer(new 
ExecutionConfig());
+        OneInputStreamOperatorTestHarness harness =
+                new OneInputStreamOperatorTestHarness(operator, 1, 1, 0);
+        
harness.getStreamConfig().setupNetworkInputs(Preconditions.checkNotNull(serializer));
+        harness.getStreamConfig().serializeAllConfigs();
+        harness.setup(serializer);
+        return harness;
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTaskTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTaskTest.java
new file mode 100644
index 0000000000..fa567e5490
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTaskTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.compact.changelog;
+
+import org.apache.paimon.catalog.FileSystemCatalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.options.MemorySize;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.stats.SimpleStats;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypes;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.FileNotFoundException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link ChangelogCompactTask}. */
+public class ChangelogCompactTaskTest {
+
+    @TempDir java.nio.file.Path tempDir;
+
+    @Test
+    public void testExceptionWhenRead() throws Exception {
+        FileSystemCatalog catalog =
+                new FileSystemCatalog(LocalFileIO.create(), new 
Path(tempDir.toString()));
+        catalog.createDatabase("default", false);
+        catalog.createTable(
+                Identifier.create("default", "T"),
+                new Schema(
+                        Arrays.asList(
+                                new DataField(0, "k", DataTypes.INT()),
+                                new DataField(1, "v", DataTypes.INT())),
+                        Collections.emptyList(),
+                        Collections.singletonList("k"),
+                        new HashMap<>(),
+                        ""),
+                false);
+
+        Map<Integer, List<DataFileMeta>> files = new HashMap<>();
+        files.put(
+                0,
+                Collections.singletonList(
+                        DataFileMeta.forAppend(
+                                "unexisting-file",
+                                128,
+                                0,
+                                SimpleStats.EMPTY_STATS,
+                                0,
+                                0,
+                                1,
+                                Collections.emptyList(),
+                                null,
+                                null,
+                                null,
+                                null)));
+        ChangelogCompactTask task =
+                new ChangelogCompactTask(1, BinaryRow.EMPTY_ROW, 1, files, new 
HashMap<>());
+        assertThatThrownBy(
+                        () ->
+                                task.doCompact(
+                                        (FileStoreTable)
+                                                
catalog.getTable(Identifier.create("default", "T")),
+                                        Executors.newFixedThreadPool(1),
+                                        MemorySize.ofMebiBytes(64)))
+                .isInstanceOf(FileNotFoundException.class)
+                .hasMessageContaining("unexisting-file");
+    }
+}


Reply via email to