This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new e732f2e7ee [flink] Fix deadlock and incorrect order in precommit
changelog compaction (#5797)
e732f2e7ee is described below
commit e732f2e7ee683558d885866cd39d332b304cec34
Author: tsreaper <[email protected]>
AuthorDate: Thu Jun 26 17:44:11 2025 +0800
[flink] Fix deadlock and incorrect order in precommit changelog compaction
(#5797)
---
.../org/apache/paimon/compact/CompactTask.java | 2 +-
.../ChangelogCompactCoordinateOperator.java | 13 +-
.../changelog/ChangelogCompactSortOperator.java | 175 +++++++++++++++++
.../compact/changelog/ChangelogCompactTask.java | 49 +++--
.../org/apache/paimon/flink/sink/FlinkSink.java | 14 +-
.../flink/PrimaryKeyFileStoreTableITCase.java | 6 +-
.../ChangelogCompactSortOperatorTest.java | 216 +++++++++++++++++++++
.../changelog/ChangelogCompactTaskTest.java | 98 ++++++++++
8 files changed, 540 insertions(+), 33 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/compact/CompactTask.java
b/paimon-core/src/main/java/org/apache/paimon/compact/CompactTask.java
index c8da0f3ef2..69e68949c3 100644
--- a/paimon-core/src/main/java/org/apache/paimon/compact/CompactTask.java
+++ b/paimon-core/src/main/java/org/apache/paimon/compact/CompactTask.java
@@ -69,7 +69,7 @@ public abstract class CompactTask implements
Callable<CompactResult> {
LOG);
if (LOG.isDebugEnabled()) {
- logMetric(startMillis, result.before(), result.after());
+ LOG.debug(logMetric(startMillis, result.before(),
result.after()));
}
return result;
} finally {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactCoordinateOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactCoordinateOperator.java
index 34553ebe5f..0190279a8b 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactCoordinateOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactCoordinateOperator.java
@@ -56,6 +56,7 @@ public class ChangelogCompactCoordinateOperator
private transient long checkpointId;
private transient Map<BinaryRow, PartitionChangelog> partitionChangelogs;
+ private transient Map<BinaryRow, Integer> numBuckets;
public ChangelogCompactCoordinateOperator(CoreOptions options) {
this.options = options;
@@ -67,6 +68,7 @@ public class ChangelogCompactCoordinateOperator
checkpointId = Long.MIN_VALUE;
partitionChangelogs = new LinkedHashMap<>();
+ numBuckets = new LinkedHashMap<>();
}
public void processElement(StreamRecord<Committable> record) {
@@ -84,6 +86,8 @@ public class ChangelogCompactCoordinateOperator
return;
}
+ numBuckets.put(message.partition(), message.totalBuckets());
+
// Changelog files are not stored in an LSM tree,
// so we can regard them as files without primary keys.
long targetFileSize = options.targetFileSize(false);
@@ -145,10 +149,12 @@ public class ChangelogCompactCoordinateOperator
output.collect(new StreamRecord<>(Either.Left(newCommittable)));
}
+ @Override
public void prepareSnapshotPreBarrier(long checkpointId) {
emitAllPartitionsChangelogCompactTask();
}
+ @Override
public void endInput() {
emitAllPartitionsChangelogCompactTask();
}
@@ -174,7 +180,7 @@ public class ChangelogCompactCoordinateOperator
new CommitMessageImpl(
partition,
entry.getKey(),
- options.bucket(),
+ numBuckets.get(partition),
new DataIncrement(
Collections.emptyList(),
Collections.emptyList(),
@@ -187,7 +193,7 @@ public class ChangelogCompactCoordinateOperator
new CommitMessageImpl(
partition,
entry.getKey(),
- options.bucket(),
+ numBuckets.get(partition),
DataIncrement.emptyIncrement(),
new CompactIncrement(
Collections.emptyList(),
@@ -204,7 +210,7 @@ public class ChangelogCompactCoordinateOperator
new ChangelogCompactTask(
checkpointId,
partition,
- options.bucket(),
+ numBuckets.get(partition),
partitionChangelog.newFileChangelogFiles,
partitionChangelog.compactChangelogFiles))));
}
@@ -216,6 +222,7 @@ public class ChangelogCompactCoordinateOperator
for (BinaryRow partition : partitions) {
emitPartitionChangelogCompactTask(partition);
}
+ numBuckets.clear();
}
private static class PartitionChangelog {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactSortOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactSortOperator.java
new file mode 100644
index 0000000000..92e0110f27
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactSortOperator.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.compact.changelog;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.flink.sink.Committable;
+import org.apache.paimon.io.CompactIncrement;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.DataIncrement;
+import org.apache.paimon.io.IndexIncrement;
+import org.apache.paimon.table.sink.CommitMessageImpl;
+
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.function.BiConsumer;
+
+/** Operator to sort changelog files from each bucket by their creation time.
*/
+public class ChangelogCompactSortOperator extends
AbstractStreamOperator<Committable>
+ implements OneInputStreamOperator<Committable, Committable>,
BoundedOneInput {
+
+ private transient Map<BinaryRow, Map<Integer, List<DataFileMeta>>>
newFileChangelogFiles;
+ private transient Map<BinaryRow, Map<Integer, List<DataFileMeta>>>
compactChangelogFiles;
+ private transient Map<BinaryRow, Integer> numBuckets;
+
+ @Override
+ public void open() {
+ newFileChangelogFiles = new LinkedHashMap<>();
+ compactChangelogFiles = new LinkedHashMap<>();
+ numBuckets = new LinkedHashMap<>();
+ }
+
+ @Override
+ public void processElement(StreamRecord<Committable> record) throws
Exception {
+ Committable committable = record.getValue();
+ if (committable.kind() != Committable.Kind.FILE) {
+ output.collect(record);
+ return;
+ }
+
+ CommitMessageImpl message = (CommitMessageImpl)
committable.wrappedCommittable();
+ if (message.newFilesIncrement().changelogFiles().isEmpty()
+ && message.compactIncrement().changelogFiles().isEmpty()) {
+ output.collect(record);
+ return;
+ }
+
+ numBuckets.put(message.partition(), message.totalBuckets());
+
+ BiConsumer<DataFileMeta, Map<BinaryRow, Map<Integer,
List<DataFileMeta>>>> addChangelog =
+ (meta, changelogFiles) ->
+ changelogFiles
+ .computeIfAbsent(message.partition(), p -> new
TreeMap<>())
+ .computeIfAbsent(message.bucket(), b -> new
ArrayList<>())
+ .add(meta);
+ for (DataFileMeta meta : message.newFilesIncrement().changelogFiles())
{
+ addChangelog.accept(meta, newFileChangelogFiles);
+ }
+ for (DataFileMeta meta : message.compactIncrement().changelogFiles()) {
+ addChangelog.accept(meta, compactChangelogFiles);
+ }
+
+ CommitMessageImpl newMessage =
+ new CommitMessageImpl(
+ message.partition(),
+ message.bucket(),
+ message.totalBuckets(),
+ new DataIncrement(
+ message.newFilesIncrement().newFiles(),
+ message.newFilesIncrement().deletedFiles(),
+ Collections.emptyList()),
+ new CompactIncrement(
+ message.compactIncrement().compactBefore(),
+ message.compactIncrement().compactAfter(),
+ Collections.emptyList()),
+ message.indexIncrement());
+ if (!newMessage.isEmpty()) {
+ Committable newCommittable =
+ new Committable(committable.checkpointId(),
Committable.Kind.FILE, newMessage);
+ output.collect(new StreamRecord<>(newCommittable));
+ }
+ }
+
+ @Override
+ public void prepareSnapshotPreBarrier(long checkpointId) {
+ emitAll(checkpointId);
+ }
+
+ @Override
+ public void endInput() {
+ emitAll(Long.MAX_VALUE);
+ }
+
+ private void emitAll(long checkpointId) {
+ Map<BinaryRow, Set<Integer>> activeBuckets = new LinkedHashMap<>();
+ collectActiveBuckets(newFileChangelogFiles, activeBuckets);
+ collectActiveBuckets(compactChangelogFiles, activeBuckets);
+
+ for (Map.Entry<BinaryRow, Set<Integer>> entry :
activeBuckets.entrySet()) {
+ BinaryRow partition = entry.getKey();
+ for (int bucket : entry.getValue()) {
+ CommitMessageImpl newMessage =
+ new CommitMessageImpl(
+ partition,
+ bucket,
+ numBuckets.get(partition),
+ new DataIncrement(
+ Collections.emptyList(),
+ Collections.emptyList(),
+
sortedChangelogs(newFileChangelogFiles, partition, bucket)),
+ new CompactIncrement(
+ Collections.emptyList(),
+ Collections.emptyList(),
+
sortedChangelogs(compactChangelogFiles, partition, bucket)),
+ new IndexIncrement(Collections.emptyList()));
+ Committable newCommittable =
+ new Committable(checkpointId, Committable.Kind.FILE,
newMessage);
+ output.collect(new StreamRecord<>(newCommittable));
+ }
+ }
+
+ newFileChangelogFiles.clear();
+ compactChangelogFiles.clear();
+ numBuckets.clear();
+ }
+
+ private void collectActiveBuckets(
+ Map<BinaryRow, Map<Integer, List<DataFileMeta>>> from,
+ Map<BinaryRow, Set<Integer>> activeBuckets) {
+ for (Map.Entry<BinaryRow, Map<Integer, List<DataFileMeta>>> entry :
from.entrySet()) {
+ activeBuckets
+ .computeIfAbsent(entry.getKey(), k -> new TreeSet<>())
+ .addAll(entry.getValue().keySet());
+ }
+ }
+
+ private List<DataFileMeta> sortedChangelogs(
+ Map<BinaryRow, Map<Integer, List<DataFileMeta>>> from,
+ BinaryRow partition,
+ int bucket) {
+ List<DataFileMeta> result = new ArrayList<>();
+ if (from.containsKey(partition) &&
from.get(partition).containsKey(bucket)) {
+ result.addAll(from.get(partition).get(bucket));
+ }
+
result.sort(Comparator.comparingLong(DataFileMeta::creationTimeEpochMillis));
+ return result;
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTask.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTask.java
index 7605af61c4..86ec97ddde 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTask.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTask.java
@@ -40,7 +40,6 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.Serializable;
-import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -50,7 +49,6 @@ import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.function.BiConsumer;
@@ -148,37 +146,37 @@ public class ChangelogCompactTask implements Serializable
{
Semaphore semaphore = new Semaphore((int) bufferSize.getBytes());
BlockingQueue<ReadTask> finishedTasks = new LinkedBlockingQueue<>();
- List<Future<?>> futures =
- ThreadPoolUtils.submitAllTasks(
- executor,
- t -> {
- // Why not create `finishedTasks` as a blocking
queue and use it to
- // limit the total size of bytes awaiting to be
copied? Because finished
- // tasks are added after their contents are read,
so even if
- // `finishedTasks` is full, each thread can still
read one more file,
- // and the limit will become `bytesInThreads +
bufferSize`, not just
- // `bufferSize`.
- try {
- semaphore.acquire((int) t.meta.fileSize());
- t.readFully();
- finishedTasks.put(t);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new RuntimeException(e);
- }
- },
- tasks);
+ ThreadPoolUtils.submitAllTasks(
+ executor,
+ t -> {
+ // Why not create `finishedTasks` as a blocking queue and
use it to limit the
+ // total size of bytes awaiting to be copied? Because
finished tasks are added
+ // after their contents are read, so even if
`finishedTasks` is full, each
+ // thread can still read one more file, and the limit will
become
+ // `bytesInThreads + bufferSize`, not just `bufferSize`.
+ try {
+ semaphore.acquire((int) t.meta.fileSize());
+ t.readFully();
+ finishedTasks.put(t);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ },
+ tasks);
OutputStream outputStream = new OutputStream();
List<Result> results = new ArrayList<>();
for (int i = 0; i < tasks.size(); i++) {
// copy all files into a new big file
ReadTask task = finishedTasks.take();
+ if (task.exception != null) {
+ throw task.exception;
+ }
write(task, outputStream, results);
semaphore.release((int) task.meta.fileSize());
}
outputStream.out.close();
- ThreadPoolUtils.awaitAllFutures(futures);
return produceNewCommittables(results, table, pathFactory,
outputStream.path);
}
@@ -316,6 +314,7 @@ public class ChangelogCompactTask implements Serializable {
private final DataFileMeta meta;
private byte[] result = null;
+ private Exception exception = null;
private ReadTask(
FileStoreTable table,
@@ -334,8 +333,8 @@ public class ChangelogCompactTask implements Serializable {
try {
result =
IOUtils.readFully(table.fileIO().newInputStream(path), true);
table.fileIO().deleteQuietly(path);
- } catch (IOException e) {
- throw new UncheckedIOException(e);
+ } catch (Exception e) {
+ exception = e;
}
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
index 3431f34cc5..6349dc3972 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
@@ -22,6 +22,7 @@ import org.apache.paimon.CoreOptions;
import org.apache.paimon.CoreOptions.ChangelogProducer;
import org.apache.paimon.CoreOptions.TagCreationMode;
import
org.apache.paimon.flink.compact.changelog.ChangelogCompactCoordinateOperator;
+import org.apache.paimon.flink.compact.changelog.ChangelogCompactSortOperator;
import
org.apache.paimon.flink.compact.changelog.ChangelogCompactWorkerOperator;
import org.apache.paimon.flink.compact.changelog.ChangelogTaskTypeInfo;
import org.apache.paimon.manifest.ManifestCommittable;
@@ -248,7 +249,7 @@ public abstract class FlinkSink<T> implements Serializable {
written, options.get(SINK_WRITER_CPU),
options.get(SINK_WRITER_MEMORY));
if (!table.primaryKeys().isEmpty() && options.get(PRECOMMIT_COMPACT)) {
- SingleOutputStreamOperator<Committable> newWritten =
+ SingleOutputStreamOperator<Committable> beforeSort =
written.transform(
"Changelog Compact Coordinator",
new EitherTypeInfo<>(
@@ -259,8 +260,15 @@ public abstract class FlinkSink<T> implements Serializable
{
"Changelog Compact Worker",
new CommittableTypeInfo(),
new ChangelogCompactWorkerOperator(table));
- forwardParallelism(newWritten, written);
- written = newWritten;
+ forwardParallelism(beforeSort, written);
+
+ written =
+ beforeSort
+ .transform(
+ "Changelog Sort by Creation Time",
+ new CommittableTypeInfo(),
+ new ChangelogCompactSortOperator())
+ .forceNonParallel();
}
return written;
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
index 5958cc632e..2c9bbaffd0 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
@@ -1263,6 +1263,10 @@ public class PrimaryKeyFileStoreTableITCase extends
AbstractTestBase {
+ tableProperties
+ ")");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Creating test table with properties {}",
tableProperties);
+ }
+
// input data must be strictly ordered
tEnv.getConfig()
.getConfiguration()
@@ -1298,7 +1302,7 @@ public class PrimaryKeyFileStoreTableITCase extends
AbstractTestBase {
+ " 'fields.i.kind' = 'sequence',"
+ " 'fields.i.start' = '0',"
+ " 'fields.i.end' = '"
- + (usefulNumRows - 1) * factor
+ + (usefulNumRows * factor - 1)
+ "',"
+ " 'number-of-rows' = '"
+ usefulNumRows * factor
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactSortOperatorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactSortOperatorTest.java
new file mode 100644
index 0000000000..6b47256a5c
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactSortOperatorTest.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.compact.changelog;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryRowWriter;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.flink.sink.Committable;
+import org.apache.paimon.flink.sink.CommittableTypeInfo;
+import org.apache.paimon.io.CompactIncrement;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.DataIncrement;
+import org.apache.paimon.options.MemorySize;
+import org.apache.paimon.table.sink.CommitMessageImpl;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.util.Preconditions;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.function.Function;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link ChangelogCompactSortOperator}. */
+public class ChangelogCompactSortOperatorTest {
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testChangelogSorted() throws Exception {
+ ChangelogCompactSortOperator operator = new
ChangelogCompactSortOperator();
+ OneInputStreamOperatorTestHarness<Committable, Committable>
testHarness =
+ createTestHarness(operator);
+
+ Function<Integer, BinaryRow> binaryRow =
+ i -> {
+ BinaryRow row = new BinaryRow(1);
+ BinaryRowWriter writer = new BinaryRowWriter(row);
+ writer.writeInt(0, i);
+ writer.complete();
+ return row;
+ };
+
+ testHarness.open();
+
+ List<DataFileMeta> files = new ArrayList<>();
+ for (int i = 0; i <= 10; i++) {
+ files.add(createDataFileMeta(i, i * 100));
+ }
+
+ CommitMessageImpl onlyData =
+ new CommitMessageImpl(
+ binaryRow.apply(0),
+ 0,
+ 2,
+ new DataIncrement(
+ Arrays.asList(files.get(2), files.get(1)),
+ Collections.emptyList(),
+ Collections.emptyList()),
+ CompactIncrement.emptyIncrement());
+ testHarness.processElement(
+ new StreamRecord<>(new Committable(1, Committable.Kind.FILE,
onlyData)));
+
+ CommitMessageImpl onlyChangelogBucket0 =
+ new CommitMessageImpl(
+ binaryRow.apply(0),
+ 0,
+ 2,
+ new DataIncrement(
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Arrays.asList(files.get(4), files.get(3))),
+ CompactIncrement.emptyIncrement());
+ testHarness.processElement(
+ new StreamRecord<>(
+ new Committable(1, Committable.Kind.FILE,
onlyChangelogBucket0)));
+
+ CommitMessageImpl onlyChangelogBucket1 =
+ new CommitMessageImpl(
+ binaryRow.apply(0),
+ 1,
+ 2,
+ new DataIncrement(
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Arrays.asList(files.get(7), files.get(8))),
+ CompactIncrement.emptyIncrement());
+ testHarness.processElement(
+ new StreamRecord<>(
+ new Committable(1, Committable.Kind.FILE,
onlyChangelogBucket1)));
+
+ CommitMessageImpl mixed =
+ new CommitMessageImpl(
+ binaryRow.apply(0),
+ 1,
+ 2,
+ new DataIncrement(
+ Arrays.asList(files.get(10), files.get(9)),
+ Collections.emptyList(),
+ Arrays.asList(files.get(6), files.get(5))),
+ CompactIncrement.emptyIncrement());
+ testHarness.processElement(
+ new StreamRecord<>(new Committable(1, Committable.Kind.FILE,
mixed)));
+
+ testHarness.prepareSnapshotPreBarrier(1);
+
+ List<Object> output = new ArrayList<>(testHarness.getOutput());
+ assertThat(output).hasSize(4);
+
+ List<CommitMessageImpl> actual = new ArrayList<>();
+ for (Object o : output) {
+ actual.add(
+ (CommitMessageImpl)
+ ((StreamRecord<Committable>)
o).getValue().wrappedCommittable());
+ }
+
+ assertThat(actual.get(0)).isEqualTo(onlyData);
+ assertThat(actual.get(1))
+ .isEqualTo(
+ new CommitMessageImpl(
+ binaryRow.apply(0),
+ 1,
+ 2,
+ new DataIncrement(
+ Arrays.asList(files.get(10),
files.get(9)),
+ Collections.emptyList(),
+ Collections.emptyList()),
+ CompactIncrement.emptyIncrement()));
+ assertThat(actual.get(2))
+ .isEqualTo(
+ new CommitMessageImpl(
+ binaryRow.apply(0),
+ 0,
+ 2,
+ new DataIncrement(
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Arrays.asList(files.get(3),
files.get(4))),
+ CompactIncrement.emptyIncrement()));
+ assertThat(actual.get(3))
+ .isEqualTo(
+ new CommitMessageImpl(
+ binaryRow.apply(0),
+ 1,
+ 2,
+ new DataIncrement(
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Arrays.asList(
+ files.get(5),
+ files.get(6),
+ files.get(7),
+ files.get(8))),
+ CompactIncrement.emptyIncrement()));
+
+ testHarness.close();
+ }
+
+ private DataFileMeta createDataFileMeta(int mb, long creationMillis) {
+ return new DataFileMeta(
+ UUID.randomUUID().toString(),
+ MemorySize.ofMebiBytes(mb).getBytes(),
+ 0,
+ null,
+ null,
+ null,
+ null,
+ 0,
+ 0,
+ 0,
+ 0,
+ Collections.emptyList(),
+ Timestamp.fromEpochMillis(creationMillis),
+ null,
+ null,
+ null,
+ null,
+ null);
+ }
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ private OneInputStreamOperatorTestHarness<Committable, Committable>
createTestHarness(
+ ChangelogCompactSortOperator operator) throws Exception {
+ TypeSerializer serializer =
+ new CommittableTypeInfo().createSerializer(new
ExecutionConfig());
+ OneInputStreamOperatorTestHarness harness =
+ new OneInputStreamOperatorTestHarness(operator, 1, 1, 0);
+
harness.getStreamConfig().setupNetworkInputs(Preconditions.checkNotNull(serializer));
+ harness.getStreamConfig().serializeAllConfigs();
+ harness.setup(serializer);
+ return harness;
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTaskTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTaskTest.java
new file mode 100644
index 0000000000..fa567e5490
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTaskTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.compact.changelog;
+
+import org.apache.paimon.catalog.FileSystemCatalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.options.MemorySize;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.stats.SimpleStats;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypes;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.FileNotFoundException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link ChangelogCompactTask}. */
+public class ChangelogCompactTaskTest {
+
+ @TempDir java.nio.file.Path tempDir;
+
+ @Test
+ public void testExceptionWhenRead() throws Exception {
+ FileSystemCatalog catalog =
+ new FileSystemCatalog(LocalFileIO.create(), new
Path(tempDir.toString()));
+ catalog.createDatabase("default", false);
+ catalog.createTable(
+ Identifier.create("default", "T"),
+ new Schema(
+ Arrays.asList(
+ new DataField(0, "k", DataTypes.INT()),
+ new DataField(1, "v", DataTypes.INT())),
+ Collections.emptyList(),
+ Collections.singletonList("k"),
+ new HashMap<>(),
+ ""),
+ false);
+
+ Map<Integer, List<DataFileMeta>> files = new HashMap<>();
+ files.put(
+ 0,
+ Collections.singletonList(
+ DataFileMeta.forAppend(
+ "unexisting-file",
+ 128,
+ 0,
+ SimpleStats.EMPTY_STATS,
+ 0,
+ 0,
+ 1,
+ Collections.emptyList(),
+ null,
+ null,
+ null,
+ null)));
+ ChangelogCompactTask task =
+ new ChangelogCompactTask(1, BinaryRow.EMPTY_ROW, 1, files, new
HashMap<>());
+ assertThatThrownBy(
+ () ->
+ task.doCompact(
+ (FileStoreTable)
+
catalog.getTable(Identifier.create("default", "T")),
+ Executors.newFixedThreadPool(1),
+ MemorySize.ofMebiBytes(64)))
+ .isInstanceOf(FileNotFoundException.class)
+ .hasMessageContaining("unexisting-file");
+ }
+}