This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new abc3a84746 [flink] Enable data-evolution compaction on flink (#6875)
abc3a84746 is described below

commit abc3a84746542df4f1ca79954b991042a06717a7
Author: YeJunHao <[email protected]>
AuthorDate: Thu Dec 25 13:20:38 2025 +0800

    [flink] Enable data-evolution compaction on flink (#6875)
---
 .../DataEvolutionCompactCoordinator.java           |  11 ++
 .../dataevolution/DataEvolutionCompactTask.java    |   5 +-
 .../paimon/table/DataEvolutionTableTest.java       |  22 +++-
 .../apache/paimon/flink/action/CompactAction.java  |  19 ++-
 .../flink/compact/DataEvolutionTableCompact.java   |  82 ++++++++++++
 ...ataEvolutionCompactionTaskSimpleSerializer.java |  66 ++++++++++
 .../sink/DataEvolutionCompactionTaskTypeInfo.java  |  99 ++++++++++++++
 .../DataEvolutionCompactionWorkerOperator.java     | 102 +++++++++++++++
 .../flink/sink/DataEvolutionTableCompactSink.java  |  63 +++++++++
 .../source/DataEvolutionTableCompactSource.java    | 114 ++++++++++++++++
 .../paimon/flink/action/CompactActionITCase.java   | 119 +++++++++++++++++
 .../paimon/spark/procedure/CompactProcedure.java   | 144 +++++++++++----------
 12 files changed, 765 insertions(+), 81 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinator.java
 
b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinator.java
index 8aef7b0a6f..14bc83c547 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinator.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinator.java
@@ -26,6 +26,7 @@ import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.manifest.ManifestFileMeta;
 import org.apache.paimon.partition.PartitionPredicate;
 import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.source.EndOfScanException;
 import org.apache.paimon.table.source.ScanMode;
 import org.apache.paimon.table.source.snapshot.SnapshotReader;
 import org.apache.paimon.utils.RangeHelper;
@@ -44,6 +45,7 @@ import java.util.TreeMap;
 import java.util.stream.Collectors;
 
 import static org.apache.paimon.format.blob.BlobFileFormat.isBlobFile;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /** Compact coordinator to compact data evolution table. */
 public class DataEvolutionCompactCoordinator {
@@ -108,9 +110,14 @@ public class DataEvolutionCompactCoordinator {
                 List<ManifestEntry> targetEntries =
                         currentMetas.stream()
                                 .flatMap(meta -> 
snapshotReader.readManifest(meta).stream())
+                                // we don't need stats for compaction
+                                .map(ManifestEntry::copyWithoutStats)
                                 .collect(Collectors.toList());
                 result.addAll(targetEntries);
             }
+            if (result.isEmpty()) {
+                throw new EndOfScanException();
+            }
             return result;
         }
     }
@@ -199,6 +206,10 @@ public class DataEvolutionCompactCoordinator {
 
                     long weightSum = 0L;
                     for (List<DataFileMeta> fileGroup : groupedFiles) {
+                        checkArgument(
+                                rangeHelper.areAllRangesSame(fileGroup),
+                                "Data files %s should be all row id ranges 
same.",
+                                dataFiles);
                         long currentGroupWeight =
                                 fileGroup.stream()
                                         .mapToLong(d -> Math.max(d.fileSize(), 
openFileCost))
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactTask.java
 
b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactTask.java
index fb25327254..d4da47bdc5 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactTask.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactTask.java
@@ -80,7 +80,7 @@ public class DataEvolutionCompactTask {
         return blobTask;
     }
 
-    public CommitMessage doCompact(FileStoreTable table) throws Exception {
+    public CommitMessage doCompact(FileStoreTable table, String commitUser) 
throws Exception {
         if (blobTask) {
             // TODO: support blob file compaction
             throw new UnsupportedOperationException("Blob task is not 
supported");
@@ -107,8 +107,7 @@ public class DataEvolutionCompactTask {
                         .build();
         RecordReader<InternalRow> reader =
                 
store.newDataEvolutionRead().withReadType(readWriteType).createReader(dataSplit);
-        AppendFileStoreWrite storeWrite =
-                (AppendFileStoreWrite) 
store.newWrite("Compact-Data-Evolution");
+        AppendFileStoreWrite storeWrite = (AppendFileStoreWrite) 
store.newWrite(commitUser);
         storeWrite.withWriteType(readWriteType);
         RecordWriter<InternalRow> writer = storeWrite.createWriter(partition, 
0);
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java 
b/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java
index 6a7efd013b..a1460257fa 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java
@@ -56,6 +56,7 @@ import org.apache.paimon.table.sink.BatchWriteBuilder;
 import org.apache.paimon.table.sink.CommitMessage;
 import org.apache.paimon.table.sink.CommitMessageImpl;
 import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.EndOfScanException;
 import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.table.source.Split;
 import org.apache.paimon.types.DataField;
@@ -913,11 +914,15 @@ public class DataEvolutionTableTest extends TableTestBase 
{
         // Each plan() call processes one manifest group
         List<DataEvolutionCompactTask> allTasks = new ArrayList<>();
         List<DataEvolutionCompactTask> tasks;
-        while (!(tasks = coordinator.plan()).isEmpty() || allTasks.isEmpty()) {
-            allTasks.addAll(tasks);
-            if (tasks.isEmpty()) {
-                break;
+        try {
+            while (!(tasks = coordinator.plan()).isEmpty() || 
allTasks.isEmpty()) {
+                allTasks.addAll(tasks);
+                if (tasks.isEmpty()) {
+                    break;
+                }
             }
+        } catch (EndOfScanException ingore) {
+
         }
 
         // Verify no exceptions were thrown and tasks list is valid (may be 
empty)
@@ -940,10 +945,13 @@ public class DataEvolutionTableTest extends TableTestBase 
{
         // Each plan() call processes one manifest group
         List<CommitMessage> commitMessages = new ArrayList<>();
         List<DataEvolutionCompactTask> tasks;
-        while (!(tasks = coordinator.plan()).isEmpty()) {
-            for (DataEvolutionCompactTask task : tasks) {
-                commitMessages.add(task.doCompact(table));
+        try {
+            while (!(tasks = coordinator.plan()).isEmpty()) {
+                for (DataEvolutionCompactTask task : tasks) {
+                    commitMessages.add(task.doCompact(table, "test-commit"));
+                }
             }
+        } catch (EndOfScanException ignore) {
         }
 
         table.newBatchWriteBuilder().newCommit().commit(commitMessages);
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
index 2e4442d9d7..59fa394b49 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
@@ -23,6 +23,7 @@ import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.flink.FlinkConnectorOptions;
 import org.apache.paimon.flink.compact.AppendTableCompact;
+import org.apache.paimon.flink.compact.DataEvolutionTableCompact;
 import org.apache.paimon.flink.compact.IncrementalClusterCompact;
 import org.apache.paimon.flink.postpone.PostponeBucketCompactSplitSource;
 import 
org.apache.paimon.flink.postpone.RewritePostponeBucketCommittableOperator;
@@ -100,9 +101,6 @@ public class CompactAction extends TableActionBase {
                             "Only FileStoreTable supports compact action. The 
table type is '%s'.",
                             table.getClass().getName()));
         }
-        checkArgument(
-                !((FileStoreTable) table).coreOptions().dataEvolutionEnabled(),
-                "Compact action does not support data evolution table yet. ");
         HashMap<String, String> dynamicOptions = new HashMap<>(tableConf);
         dynamicOptions.put(CoreOptions.WRITE_ONLY.key(), "false");
         table = table.copy(dynamicOptions);
@@ -146,7 +144,10 @@ public class CompactAction extends TableActionBase {
         if (fileStoreTable.coreOptions().bucket() == 
BucketMode.POSTPONE_BUCKET) {
             buildForPostponeBucketCompaction(env, fileStoreTable, isStreaming);
         } else if (fileStoreTable.bucketMode() == BucketMode.BUCKET_UNAWARE) {
-            if (fileStoreTable.coreOptions().clusteringIncrementalEnabled()) {
+
+            if (fileStoreTable.coreOptions().dataEvolutionEnabled()) {
+                buildForDataEvolutionTableCompact(env, fileStoreTable, 
isStreaming);
+            } else if 
(fileStoreTable.coreOptions().clusteringIncrementalEnabled()) {
                 new IncrementalClusterCompact(
                                 env, fileStoreTable, partitionPredicate, 
fullCompaction)
                         .build();
@@ -205,6 +206,16 @@ public class CompactAction extends TableActionBase {
         builder.build();
     }
 
+    protected void buildForDataEvolutionTableCompact(
+            StreamExecutionEnvironment env, FileStoreTable table, boolean 
isStreaming)
+            throws Exception {
+        checkArgument(!isStreaming, "Data evolution table compact only 
supports batch mode yet.");
+        DataEvolutionTableCompact builder =
+                new DataEvolutionTableCompact(env, identifier.getFullName(), 
table);
+        builder.withPartitionPredicate(getPartitionPredicate());
+        builder.build();
+    }
+
     protected PartitionPredicate getPartitionPredicate() throws Exception {
         checkArgument(
                 partitions == null || whereSql == null,
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/DataEvolutionTableCompact.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/DataEvolutionTableCompact.java
new file mode 100644
index 0000000000..802f3ffd87
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/DataEvolutionTableCompact.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.compact;
+
+import org.apache.paimon.append.dataevolution.DataEvolutionCompactTask;
+import org.apache.paimon.flink.FlinkConnectorOptions;
+import org.apache.paimon.flink.sink.DataEvolutionTableCompactSink;
+import org.apache.paimon.flink.source.DataEvolutionTableCompactSource;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.partition.PartitionPredicate;
+import org.apache.paimon.table.FileStoreTable;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.transformations.PartitionTransformation;
+import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
+
+import javax.annotation.Nullable;
+
+/** Build for data-evolution table flink compaction job. */
+public class DataEvolutionTableCompact {
+
+    private final transient StreamExecutionEnvironment env;
+    private final String tableIdentifier;
+    private final FileStoreTable table;
+
+    @Nullable private PartitionPredicate partitionPredicate;
+
+    public DataEvolutionTableCompact(
+            StreamExecutionEnvironment env, String tableIdentifier, 
FileStoreTable table) {
+        this.env = env;
+        this.tableIdentifier = tableIdentifier;
+        this.table = table;
+    }
+
+    public void withPartitionPredicate(PartitionPredicate partitionPredicate) {
+        this.partitionPredicate = partitionPredicate;
+    }
+
+    public void build() {
+        DataEvolutionTableCompactSource source =
+                new DataEvolutionTableCompactSource(table, partitionPredicate);
+        DataStreamSource<DataEvolutionCompactTask> sourceStream =
+                DataEvolutionTableCompactSource.buildSource(env, source, 
tableIdentifier);
+
+        sinkFromSource(sourceStream);
+    }
+
+    private void sinkFromSource(DataStreamSource<DataEvolutionCompactTask> 
input) {
+        Options conf = Options.fromMap(table.options());
+        Integer compactionWorkerParallelism =
+                
conf.get(FlinkConnectorOptions.UNAWARE_BUCKET_COMPACTION_PARALLELISM);
+        PartitionTransformation<DataEvolutionCompactTask> transformation =
+                new PartitionTransformation<>(
+                        input.getTransformation(), new 
RebalancePartitioner<>());
+        if (compactionWorkerParallelism != null) {
+            transformation.setParallelism(compactionWorkerParallelism);
+        } else {
+            transformation.setParallelism(env.getParallelism());
+        }
+
+        DataStream<DataEvolutionCompactTask> rebalanced = new 
DataStream<>(env, transformation);
+        DataEvolutionTableCompactSink.sink(table, rebalanced);
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DataEvolutionCompactionTaskSimpleSerializer.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DataEvolutionCompactionTaskSimpleSerializer.java
new file mode 100644
index 0000000000..4225581283
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DataEvolutionCompactionTaskSimpleSerializer.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.paimon.append.dataevolution.DataEvolutionCompactTask;
+import 
org.apache.paimon.append.dataevolution.DataEvolutionCompactTaskSerializer;
+
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/** {@link SimpleVersionedSerializer} for {@link DataEvolutionCompactTask}. */
+public class DataEvolutionCompactionTaskSimpleSerializer
+        implements SimpleVersionedSerializer<DataEvolutionCompactTask> {
+
+    private final DataEvolutionCompactTaskSerializer compactionTaskSerializer;
+
+    public DataEvolutionCompactionTaskSimpleSerializer(
+            DataEvolutionCompactTaskSerializer compactionTaskSerializer) {
+        this.compactionTaskSerializer = compactionTaskSerializer;
+    }
+
+    @Override
+    public int getVersion() {
+        return 1;
+    }
+
+    @Override
+    public byte[] serialize(DataEvolutionCompactTask compactionTask) throws 
IOException {
+        byte[] wrapped = compactionTaskSerializer.serialize(compactionTask);
+        int version = compactionTaskSerializer.getVersion();
+
+        return ByteBuffer.allocate(wrapped.length + 
4).put(wrapped).putInt(version).array();
+    }
+
+    @Override
+    public DataEvolutionCompactTask deserialize(int compactionTaskVersion, 
byte[] bytes)
+            throws IOException {
+        if (compactionTaskVersion != getVersion()) {
+            throw new RuntimeException("Can not deserialize version: " + 
compactionTaskVersion);
+        }
+
+        ByteBuffer buffer = ByteBuffer.wrap(bytes);
+        byte[] wrapped = new byte[bytes.length - 4];
+        buffer.get(wrapped);
+        int version = buffer.getInt();
+        return compactionTaskSerializer.deserialize(version, wrapped);
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DataEvolutionCompactionTaskTypeInfo.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DataEvolutionCompactionTaskTypeInfo.java
new file mode 100644
index 0000000000..75f79ca178
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DataEvolutionCompactionTaskTypeInfo.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.paimon.append.dataevolution.DataEvolutionCompactTask;
+import 
org.apache.paimon.append.dataevolution.DataEvolutionCompactTaskSerializer;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.serialization.SerializerConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+/** Type information of {@link DataEvolutionCompactTask}. */
+public class DataEvolutionCompactionTaskTypeInfo extends 
TypeInformation<DataEvolutionCompactTask> {
+
+    @Override
+    public boolean isBasicType() {
+        return false;
+    }
+
+    @Override
+    public boolean isTupleType() {
+        return false;
+    }
+
+    @Override
+    public int getArity() {
+        return 1;
+    }
+
+    @Override
+    public int getTotalFields() {
+        return 1;
+    }
+
+    @Override
+    public Class<DataEvolutionCompactTask> getTypeClass() {
+        return DataEvolutionCompactTask.class;
+    }
+
+    @Override
+    public boolean isKeyType() {
+        return false;
+    }
+
+    /**
+     * Do not annotate with <code>@override</code> here to maintain 
compatibility with Flink 1.18-.
+     */
+    public TypeSerializer<DataEvolutionCompactTask> 
createSerializer(SerializerConfig config) {
+        return this.createSerializer((ExecutionConfig) null);
+    }
+
+    /**
+     * Do not annotate with <code>@override</code> here to maintain 
compatibility with Flink 2.0+.
+     */
+    public TypeSerializer<DataEvolutionCompactTask> 
createSerializer(ExecutionConfig config) {
+        // we don't need copy for task
+        return new 
NoneCopyVersionedSerializerTypeSerializerProxy<DataEvolutionCompactTask>(
+                () ->
+                        new DataEvolutionCompactionTaskSimpleSerializer(
+                                new DataEvolutionCompactTaskSerializer())) {};
+    }
+
+    @Override
+    public int hashCode() {
+        return 0;
+    }
+
+    @Override
+    public boolean canEqual(Object obj) {
+        return obj instanceof DataEvolutionCompactionTaskTypeInfo;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        return obj instanceof DataEvolutionCompactionTaskTypeInfo;
+    }
+
+    @Override
+    public String toString() {
+        return "DataEvolutionCompactionTask";
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DataEvolutionCompactionWorkerOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DataEvolutionCompactionWorkerOperator.java
new file mode 100644
index 0000000000..e5803d9aff
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DataEvolutionCompactionWorkerOperator.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.paimon.append.dataevolution.DataEvolutionCompactTask;
+import org.apache.paimon.flink.source.DataEvolutionTableCompactSource;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.table.FileStoreTable;
+
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Operator to execute {@link DataEvolutionCompactTask} passed from {@link
+ * DataEvolutionTableCompactSource} for compacting single data-evolution table.
+ */
+public class DataEvolutionCompactionWorkerOperator
+        extends PrepareCommitOperator<DataEvolutionCompactTask, Committable> {
+
+    private final FileStoreTable fileStoreTable;
+    private final String commitUser;
+    private final List<Committable> committables;
+
+    private DataEvolutionCompactionWorkerOperator(
+            StreamOperatorParameters<Committable> parameters,
+            FileStoreTable table,
+            String commitUser) {
+        super(parameters, Options.fromMap(table.options()));
+        this.fileStoreTable = table;
+        this.commitUser = commitUser;
+        this.committables = new ArrayList<>();
+    }
+
+    @Override
+    public void processElement(StreamRecord<DataEvolutionCompactTask> element) 
throws Exception {
+        DataEvolutionCompactTask task = element.getValue();
+        committables.add(
+                new Committable(
+                        Long.MAX_VALUE,
+                        Committable.Kind.FILE,
+                        task.doCompact(fileStoreTable, commitUser)));
+    }
+
+    @Override
+    protected List<Committable> prepareCommit(boolean waitCompaction, long 
checkpointId)
+            throws IOException {
+        List<Committable> toCommit = new ArrayList<>(committables);
+        committables.clear();
+        return toCommit;
+    }
+
+    /** {@link StreamOperatorFactory} of {@link 
DataEvolutionCompactionWorkerOperator}. */
+    public static class Factory
+            extends PrepareCommitOperator.Factory<DataEvolutionCompactTask, 
Committable> {
+
+        private final FileStoreTable fileStoreTable;
+        private final String commitUser;
+
+        public Factory(FileStoreTable table, String initialCommitUser) {
+            super(Options.fromMap(table.options()));
+            this.fileStoreTable = table;
+            this.commitUser = initialCommitUser;
+        }
+
+        @Override
+        @SuppressWarnings("unchecked")
+        public <T extends StreamOperator<Committable>> T createStreamOperator(
+                StreamOperatorParameters<Committable> parameters) {
+            return (T)
+                    new DataEvolutionCompactionWorkerOperator(
+                            parameters, fileStoreTable, commitUser);
+        }
+
+        @Override
+        @SuppressWarnings("rawtypes")
+        public Class<? extends StreamOperator> 
getStreamOperatorClass(ClassLoader classLoader) {
+            return DataEvolutionCompactionWorkerOperator.class;
+        }
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DataEvolutionTableCompactSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DataEvolutionTableCompactSink.java
new file mode 100644
index 0000000000..eb021be8dd
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DataEvolutionTableCompactSink.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.paimon.append.dataevolution.DataEvolutionCompactTask;
+import org.apache.paimon.manifest.ManifestCommittable;
+import org.apache.paimon.table.FileStoreTable;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
+
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/** Compaction Sink for data-evolution table. */
+public class DataEvolutionTableCompactSink extends 
FlinkSink<DataEvolutionCompactTask> {
+
+    private final FileStoreTable table;
+
+    public DataEvolutionTableCompactSink(FileStoreTable table) {
+        super(table, true);
+        this.table = table;
+    }
+
+    public static DataStreamSink<?> sink(
+            FileStoreTable table, DataStream<DataEvolutionCompactTask> input) {
+        boolean isStreaming = isStreaming(input);
+        checkArgument(!isStreaming, "Data evolution compaction sink only 
supports batch mode yet.");
+        return new DataEvolutionTableCompactSink(table).sinkFrom(input);
+    }
+
+    @Override
+    protected OneInputStreamOperatorFactory<DataEvolutionCompactTask, 
Committable>
+            createWriteOperatorFactory(StoreSinkWrite.Provider writeProvider, 
String commitUser) {
+        return new DataEvolutionCompactionWorkerOperator.Factory(table, 
commitUser);
+    }
+
+    @Override
+    protected Committer.Factory<Committable, ManifestCommittable> 
createCommitterFactory() {
+        return context -> new StoreCommitter(table, 
table.newCommit(context.commitUser()), context);
+    }
+
+    @Override
+    protected CommittableStateManager<ManifestCommittable> 
createCommittableStateManager() {
+        return new NoopCommittableStateManager();
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataEvolutionTableCompactSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataEvolutionTableCompactSource.java
new file mode 100644
index 0000000000..b0f0ca7d15
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataEvolutionTableCompactSource.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.source;
+
+import org.apache.paimon.append.dataevolution.DataEvolutionCompactCoordinator;
+import org.apache.paimon.append.dataevolution.DataEvolutionCompactTask;
+import org.apache.paimon.flink.sink.DataEvolutionCompactionTaskTypeInfo;
+import org.apache.paimon.partition.PartitionPredicate;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.source.EndOfScanException;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+
+/** Source for data-evolution table Compaction. */
+public class DataEvolutionTableCompactSource
+        extends AbstractNonCoordinatedSource<DataEvolutionCompactTask> {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(DataEvolutionTableCompactSource.class);
+    private static final String COMPACTION_COORDINATOR_NAME =
+            "DataEvolution Compaction Coordinator";
+
+    private final FileStoreTable table;
+    private final PartitionPredicate partitionFilter;
+
+    public DataEvolutionTableCompactSource(
+            FileStoreTable table, @Nullable PartitionPredicate 
partitionFilter) {
+        this.table = table;
+        this.partitionFilter = partitionFilter;
+    }
+
+    @Override
+    public Boundedness getBoundedness() {
+        return Boundedness.BOUNDED;
+    }
+
+    @Override
+    public SourceReader<DataEvolutionCompactTask, SimpleSourceSplit> 
createReader(
+            SourceReaderContext readerContext) throws Exception {
+        Preconditions.checkArgument(
+                readerContext.currentParallelism() == 1,
+                "Compaction Operator parallelism in paimon MUST be one.");
+        return new CompactSourceReader(table, partitionFilter);
+    }
+
+    /** BucketUnawareCompactSourceReader. */
+    public static class CompactSourceReader
+            extends 
AbstractNonCoordinatedSourceReader<DataEvolutionCompactTask> {
+        private final DataEvolutionCompactCoordinator compactionCoordinator;
+
+        public CompactSourceReader(FileStoreTable table, PartitionPredicate 
partitions) {
+            compactionCoordinator = new DataEvolutionCompactCoordinator(table, 
partitions, false);
+        }
+
+        @Override
+        public InputStatus pollNext(ReaderOutput<DataEvolutionCompactTask> 
readerOutput)
+                throws Exception {
+            try {
+                // do scan and plan action, emit data-evolution compaction 
tasks.
+                List<DataEvolutionCompactTask> tasks = 
compactionCoordinator.plan();
+                tasks.forEach(readerOutput::collect);
+            } catch (EndOfScanException esf) {
+                LOG.info("Catching EndOfScanException, the job is finished.");
+                return InputStatus.END_OF_INPUT;
+            }
+
+            return InputStatus.MORE_AVAILABLE;
+        }
+    }
+
+    public static DataStreamSource<DataEvolutionCompactTask> buildSource(
+            StreamExecutionEnvironment env,
+            DataEvolutionTableCompactSource source,
+            String tableIdentifier) {
+        return (DataStreamSource<DataEvolutionCompactTask>)
+                env.fromSource(
+                                source,
+                                WatermarkStrategy.noWatermarks(),
+                                COMPACTION_COORDINATOR_NAME + " : " + 
tableIdentifier,
+                                new DataEvolutionCompactionTaskTypeInfo())
+                        .setParallelism(1)
+                        .setMaxParallelism(1);
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
index 95314d5f08..e0777d37b7 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
@@ -18,23 +18,37 @@
 
 package org.apache.paimon.flink.action;
 
+import org.apache.paimon.AppendOnlyFileStore;
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.Snapshot;
+import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.flink.FlinkConnectorOptions;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.operation.BaseAppendFileStoreWrite;
+import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.CommitMessageImpl;
 import org.apache.paimon.table.sink.StreamWriteBuilder;
 import org.apache.paimon.table.sink.TableCommitImpl;
 import org.apache.paimon.table.sink.TableWriteImpl;
 import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.table.source.StreamTableScan;
 import org.apache.paimon.table.source.TableScan;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.CloseableIterator;
+import org.apache.paimon.utils.CommitIncrement;
+import org.apache.paimon.utils.RecordWriter;
 import org.apache.paimon.utils.SnapshotManager;
 import org.apache.paimon.utils.TraceableFileIO;
 
@@ -56,6 +70,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
 
 import static org.apache.paimon.utils.CommonTestUtils.waitUtil;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -825,6 +840,110 @@ public class CompactActionITCase extends 
CompactActionITCaseBase {
                 60_000);
     }
 
+    @Test
+    public void testDataEvolutionTableCompact() throws Exception {
+        Map<String, String> tableOptions = new HashMap<>();
+        tableOptions.put(CoreOptions.BUCKET.key(), "-1");
+        tableOptions.put(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true");
+        tableOptions.put(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
+        tableOptions.put(CoreOptions.COMPACTION_MIN_FILE_NUM.key(), "2");
+
+        FileStoreTable table =
+                prepareTable(
+                        Arrays.asList("k"),
+                        Collections.emptyList(),
+                        Collections.emptyList(),
+                        tableOptions);
+
+        BatchWriteBuilder builder = table.newBatchWriteBuilder();
+        BatchTableWrite write = table.newBatchWriteBuilder().newWrite();
+
+        for (int i = 0; i < 10000; i++) {
+            write.write(rowData(1, i, i, BinaryString.fromString("xxxxoooo" + 
i)));
+        }
+
+        builder.newCommit().commit(write.prepareCommit());
+
+        BaseAppendFileStoreWrite storeWrite =
+                ((AppendOnlyFileStore) table.store()).newWrite("test-compact");
+        storeWrite.withWriteType(table.rowType().project(Arrays.asList("v")));
+        RecordWriter<InternalRow> writer = 
storeWrite.createWriter(BinaryRow.singleColumn(1), 0);
+        for (int i = 10000; i < 20000; i++) {
+            writer.write(rowData(i));
+        }
+        CommitIncrement commitIncrement = writer.prepareCommit(false);
+        List<CommitMessage> commitMessages =
+                Arrays.asList(
+                        new CommitMessageImpl(
+                                BinaryRow.singleColumn(1),
+                                0,
+                                1,
+                                commitIncrement.newFilesIncrement(),
+                                commitIncrement.compactIncrement()));
+        setFirstRowId(commitMessages, 0L);
+        builder.newCommit().commit(commitMessages);
+
+        writer = storeWrite.createWriter(BinaryRow.singleColumn(1), 0);
+        for (int i = 20000; i < 30000; i++) {
+            writer.write(rowData(i));
+        }
+        commitIncrement = writer.prepareCommit(false);
+        commitMessages =
+                Arrays.asList(
+                        new CommitMessageImpl(
+                                BinaryRow.singleColumn(1),
+                                0,
+                                1,
+                                commitIncrement.newFilesIncrement(),
+                                commitIncrement.compactIncrement()));
+        setFirstRowId(commitMessages, 0L);
+        builder.newCommit().commit(commitMessages);
+
+        checkLatestSnapshot(table, 3, Snapshot.CommitKind.APPEND);
+
+        runAction(false, true, Collections.emptyList());
+
+        checkLatestSnapshot(table, 4, Snapshot.CommitKind.COMPACT);
+
+        List<DataSplit> splits = table.newSnapshotReader().read().dataSplits();
+        assertThat(splits.size()).isEqualTo(1);
+        List<DataFileMeta> fileMetas = splits.get(0).dataFiles();
+        assertThat(fileMetas.size()).isEqualTo(1);
+        assertThat(fileMetas.get(0).nonNullFirstRowId()).isEqualTo(0);
+        assertThat(fileMetas.get(0).rowCount()).isEqualTo(10000);
+
+        ReadBuilder readBuilder = table.newReadBuilder();
+        RecordReader<InternalRow> reader =
+                
readBuilder.newRead().createReader(readBuilder.newScan().plan());
+
+        int value = 20000;
+        try (CloseableIterator<InternalRow> iterator = 
reader.toCloseableIterator()) {
+            while (iterator.hasNext()) {
+                InternalRow row = iterator.next();
+                assertThat(row.getInt(1)).isEqualTo(value++);
+            }
+        }
+
+        assertThat(value).isEqualTo(30000);
+    }
+
+    private void setFirstRowId(List<CommitMessage> commitables, long 
firstRowId) {
+        commitables.forEach(
+                c -> {
+                    CommitMessageImpl commitMessage = (CommitMessageImpl) c;
+                    List<DataFileMeta> newFiles =
+                            new 
ArrayList<>(commitMessage.newFilesIncrement().newFiles());
+                    commitMessage.newFilesIncrement().newFiles().clear();
+                    commitMessage
+                            .newFilesIncrement()
+                            .newFiles()
+                            .addAll(
+                                    newFiles.stream()
+                                            .map(s -> 
s.assignFirstRowId(firstRowId))
+                                            .collect(Collectors.toList()));
+                });
+    }
+
     private void runAction(boolean isStreaming) throws Exception {
         runAction(isStreaming, false, Collections.emptyList());
     }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
index d3e10d68f9..75a99fc9b3 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
@@ -490,79 +490,89 @@ public class CompactProcedure extends BaseProcedure {
         CommitMessageSerializer messageSerializerser = new 
CommitMessageSerializer();
         String commitUser = 
createCommitUser(table.coreOptions().toConfiguration());
         List<CommitMessage> messages = new ArrayList<>();
-        while (!(compactionTasks = compactCoordinator.plan()).isEmpty()) {
-            if (partitionIdleTime != null) {
-                SnapshotReader snapshotReader = table.newSnapshotReader();
-                if (partitionPredicate != null) {
-                    snapshotReader.withPartitionFilter(partitionPredicate);
+        try {
+            while (true) {
+                compactionTasks = compactCoordinator.plan();
+                if (partitionIdleTime != null) {
+                    SnapshotReader snapshotReader = table.newSnapshotReader();
+                    if (partitionPredicate != null) {
+                        snapshotReader.withPartitionFilter(partitionPredicate);
+                    }
+                    Map<BinaryRow, Long> partitionInfo =
+                            snapshotReader.partitionEntries().stream()
+                                    .collect(
+                                            Collectors.toMap(
+                                                    PartitionEntry::partition,
+                                                    
PartitionEntry::lastFileCreationTime));
+                    long historyMilli =
+                            LocalDateTime.now()
+                                    .minus(partitionIdleTime)
+                                    .atZone(ZoneId.systemDefault())
+                                    .toInstant()
+                                    .toEpochMilli();
+                    compactionTasks =
+                            compactionTasks.stream()
+                                    .filter(
+                                            task ->
+                                                    
partitionInfo.get(task.partition())
+                                                            <= historyMilli)
+                                    .collect(Collectors.toList());
                 }
-                Map<BinaryRow, Long> partitionInfo =
-                        snapshotReader.partitionEntries().stream()
-                                .collect(
-                                        Collectors.toMap(
-                                                PartitionEntry::partition,
-                                                
PartitionEntry::lastFileCreationTime));
-                long historyMilli =
-                        LocalDateTime.now()
-                                .minus(partitionIdleTime)
-                                .atZone(ZoneId.systemDefault())
-                                .toInstant()
-                                .toEpochMilli();
-                compactionTasks =
-                        compactionTasks.stream()
-                                .filter(task -> 
partitionInfo.get(task.partition()) <= historyMilli)
-                                .collect(Collectors.toList());
-            }
-            if (compactionTasks.isEmpty()) {
-                LOG.info("Task plan is empty, no compact job to execute.");
-                continue;
-            }
-
-            DataEvolutionCompactTaskSerializer serializer =
-                    new DataEvolutionCompactTaskSerializer();
-            List<byte[]> serializedTasks = new ArrayList<>();
-            try {
-                for (DataEvolutionCompactTask compactionTask : 
compactionTasks) {
-                    serializedTasks.add(serializer.serialize(compactionTask));
+                if (compactionTasks.isEmpty()) {
+                    LOG.info("Task plan is empty, no compact job to execute.");
+                    continue;
                 }
-            } catch (IOException e) {
-                throw new RuntimeException("serialize compaction task failed");
-            }
 
-            int readParallelism = readParallelism(serializedTasks, spark());
-            JavaRDD<byte[]> commitMessageJavaRDD =
-                    javaSparkContext
-                            .parallelize(serializedTasks, readParallelism)
-                            .mapPartitions(
-                                    (FlatMapFunction<Iterator<byte[]>, byte[]>)
-                                            taskIterator -> {
-                                                
DataEvolutionCompactTaskSerializer ser =
-                                                        new 
DataEvolutionCompactTaskSerializer();
-                                                List<byte[]> messagesBytes = 
new ArrayList<>();
-                                                CommitMessageSerializer 
messageSer =
-                                                        new 
CommitMessageSerializer();
-                                                while (taskIterator.hasNext()) 
{
-                                                    DataEvolutionCompactTask 
task =
-                                                            ser.deserialize(
-                                                                    
ser.getVersion(),
-                                                                    
taskIterator.next());
-                                                    messagesBytes.add(
-                                                            
messageSer.serialize(
-                                                                    
task.doCompact(table)));
-                                                }
-                                                return 
messagesBytes.iterator();
-                                            });
+                DataEvolutionCompactTaskSerializer serializer =
+                        new DataEvolutionCompactTaskSerializer();
+                List<byte[]> serializedTasks = new ArrayList<>();
+                try {
+                    for (DataEvolutionCompactTask compactionTask : 
compactionTasks) {
+                        
serializedTasks.add(serializer.serialize(compactionTask));
+                    }
+                } catch (IOException e) {
+                    throw new RuntimeException("serialize compaction task 
failed");
+                }
 
-            List<byte[]> serializedMessages = commitMessageJavaRDD.collect();
-            try {
-                for (byte[] serializedMessage : serializedMessages) {
-                    messages.add(
-                            messageSerializerser.deserialize(
-                                    messageSerializerser.getVersion(), 
serializedMessage));
+                int readParallelism = readParallelism(serializedTasks, 
spark());
+                JavaRDD<byte[]> commitMessageJavaRDD =
+                        javaSparkContext
+                                .parallelize(serializedTasks, readParallelism)
+                                .mapPartitions(
+                                        (FlatMapFunction<Iterator<byte[]>, 
byte[]>)
+                                                taskIterator -> {
+                                                    
DataEvolutionCompactTaskSerializer ser =
+                                                            new 
DataEvolutionCompactTaskSerializer();
+                                                    List<byte[]> messagesBytes 
= new ArrayList<>();
+                                                    CommitMessageSerializer 
messageSer =
+                                                            new 
CommitMessageSerializer();
+                                                    while 
(taskIterator.hasNext()) {
+                                                        
DataEvolutionCompactTask task =
+                                                                
ser.deserialize(
+                                                                        
ser.getVersion(),
+                                                                        
taskIterator.next());
+                                                        messagesBytes.add(
+                                                                
messageSer.serialize(
+                                                                        
task.doCompact(
+                                                                               
 table,
+                                                                               
 commitUser)));
+                                                    }
+                                                    return 
messagesBytes.iterator();
+                                                });
+
+                List<byte[]> serializedMessages = 
commitMessageJavaRDD.collect();
+                try {
+                    for (byte[] serializedMessage : serializedMessages) {
+                        messages.add(
+                                messageSerializerser.deserialize(
+                                        messageSerializerser.getVersion(), 
serializedMessage));
+                    }
+                } catch (Exception e) {
+                    throw new RuntimeException("Deserialize commit message 
failed", e);
                 }
-            } catch (Exception e) {
-                throw new RuntimeException("Deserialize commit message 
failed", e);
             }
+        } catch (EndOfScanException e) {
+            LOG.info("Catching EndOfScanException, the compact job is 
finishing.");
         }
 
         try (TableCommitImpl commit = table.newCommit(commitUser)) {

Reply via email to