This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new abc3a84746 [flink] Enable data-evolution compaction on flink (#6875)
abc3a84746 is described below
commit abc3a84746542df4f1ca79954b991042a06717a7
Author: YeJunHao <[email protected]>
AuthorDate: Thu Dec 25 13:20:38 2025 +0800
[flink] Enable data-evolution compaction on flink (#6875)
---
.../DataEvolutionCompactCoordinator.java | 11 ++
.../dataevolution/DataEvolutionCompactTask.java | 5 +-
.../paimon/table/DataEvolutionTableTest.java | 22 +++-
.../apache/paimon/flink/action/CompactAction.java | 19 ++-
.../flink/compact/DataEvolutionTableCompact.java | 82 ++++++++++++
...ataEvolutionCompactionTaskSimpleSerializer.java | 66 ++++++++++
.../sink/DataEvolutionCompactionTaskTypeInfo.java | 99 ++++++++++++++
.../DataEvolutionCompactionWorkerOperator.java | 102 +++++++++++++++
.../flink/sink/DataEvolutionTableCompactSink.java | 63 +++++++++
.../source/DataEvolutionTableCompactSource.java | 114 ++++++++++++++++
.../paimon/flink/action/CompactActionITCase.java | 119 +++++++++++++++++
.../paimon/spark/procedure/CompactProcedure.java | 144 +++++++++++----------
12 files changed, 765 insertions(+), 81 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinator.java
b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinator.java
index 8aef7b0a6f..14bc83c547 100644
---
a/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinator.java
+++
b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactCoordinator.java
@@ -26,6 +26,7 @@ import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestFileMeta;
import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.source.EndOfScanException;
import org.apache.paimon.table.source.ScanMode;
import org.apache.paimon.table.source.snapshot.SnapshotReader;
import org.apache.paimon.utils.RangeHelper;
@@ -44,6 +45,7 @@ import java.util.TreeMap;
import java.util.stream.Collectors;
import static org.apache.paimon.format.blob.BlobFileFormat.isBlobFile;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
/** Compact coordinator to compact data evolution table. */
public class DataEvolutionCompactCoordinator {
@@ -108,9 +110,14 @@ public class DataEvolutionCompactCoordinator {
List<ManifestEntry> targetEntries =
currentMetas.stream()
.flatMap(meta ->
snapshotReader.readManifest(meta).stream())
+ // we don't need stats for compaction
+ .map(ManifestEntry::copyWithoutStats)
.collect(Collectors.toList());
result.addAll(targetEntries);
}
+ if (result.isEmpty()) {
+ throw new EndOfScanException();
+ }
return result;
}
}
@@ -199,6 +206,10 @@ public class DataEvolutionCompactCoordinator {
long weightSum = 0L;
for (List<DataFileMeta> fileGroup : groupedFiles) {
+ checkArgument(
+ rangeHelper.areAllRangesSame(fileGroup),
+ "Data files %s should be all row id ranges
same.",
+ dataFiles);
long currentGroupWeight =
fileGroup.stream()
.mapToLong(d -> Math.max(d.fileSize(),
openFileCost))
diff --git
a/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactTask.java
b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactTask.java
index fb25327254..d4da47bdc5 100644
---
a/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactTask.java
+++
b/paimon-core/src/main/java/org/apache/paimon/append/dataevolution/DataEvolutionCompactTask.java
@@ -80,7 +80,7 @@ public class DataEvolutionCompactTask {
return blobTask;
}
- public CommitMessage doCompact(FileStoreTable table) throws Exception {
+ public CommitMessage doCompact(FileStoreTable table, String commitUser)
throws Exception {
if (blobTask) {
// TODO: support blob file compaction
throw new UnsupportedOperationException("Blob task is not
supported");
@@ -107,8 +107,7 @@ public class DataEvolutionCompactTask {
.build();
RecordReader<InternalRow> reader =
store.newDataEvolutionRead().withReadType(readWriteType).createReader(dataSplit);
- AppendFileStoreWrite storeWrite =
- (AppendFileStoreWrite)
store.newWrite("Compact-Data-Evolution");
+ AppendFileStoreWrite storeWrite = (AppendFileStoreWrite)
store.newWrite(commitUser);
storeWrite.withWriteType(readWriteType);
RecordWriter<InternalRow> writer = storeWrite.createWriter(partition,
0);
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java
index 6a7efd013b..a1460257fa 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/DataEvolutionTableTest.java
@@ -56,6 +56,7 @@ import org.apache.paimon.table.sink.BatchWriteBuilder;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.EndOfScanException;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.types.DataField;
@@ -913,11 +914,15 @@ public class DataEvolutionTableTest extends TableTestBase
{
// Each plan() call processes one manifest group
List<DataEvolutionCompactTask> allTasks = new ArrayList<>();
List<DataEvolutionCompactTask> tasks;
- while (!(tasks = coordinator.plan()).isEmpty() || allTasks.isEmpty()) {
- allTasks.addAll(tasks);
- if (tasks.isEmpty()) {
- break;
+ try {
+ while (!(tasks = coordinator.plan()).isEmpty() ||
allTasks.isEmpty()) {
+ allTasks.addAll(tasks);
+ if (tasks.isEmpty()) {
+ break;
+ }
}
+ } catch (EndOfScanException ingore) {
+
}
// Verify no exceptions were thrown and tasks list is valid (may be
empty)
@@ -940,10 +945,13 @@ public class DataEvolutionTableTest extends TableTestBase
{
// Each plan() call processes one manifest group
List<CommitMessage> commitMessages = new ArrayList<>();
List<DataEvolutionCompactTask> tasks;
- while (!(tasks = coordinator.plan()).isEmpty()) {
- for (DataEvolutionCompactTask task : tasks) {
- commitMessages.add(task.doCompact(table));
+ try {
+ while (!(tasks = coordinator.plan()).isEmpty()) {
+ for (DataEvolutionCompactTask task : tasks) {
+ commitMessages.add(task.doCompact(table, "test-commit"));
+ }
}
+ } catch (EndOfScanException ignore) {
}
table.newBatchWriteBuilder().newCommit().commit(commitMessages);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
index 2e4442d9d7..59fa394b49 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
@@ -23,6 +23,7 @@ import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.compact.AppendTableCompact;
+import org.apache.paimon.flink.compact.DataEvolutionTableCompact;
import org.apache.paimon.flink.compact.IncrementalClusterCompact;
import org.apache.paimon.flink.postpone.PostponeBucketCompactSplitSource;
import
org.apache.paimon.flink.postpone.RewritePostponeBucketCommittableOperator;
@@ -100,9 +101,6 @@ public class CompactAction extends TableActionBase {
"Only FileStoreTable supports compact action. The
table type is '%s'.",
table.getClass().getName()));
}
- checkArgument(
- !((FileStoreTable) table).coreOptions().dataEvolutionEnabled(),
- "Compact action does not support data evolution table yet. ");
HashMap<String, String> dynamicOptions = new HashMap<>(tableConf);
dynamicOptions.put(CoreOptions.WRITE_ONLY.key(), "false");
table = table.copy(dynamicOptions);
@@ -146,7 +144,10 @@ public class CompactAction extends TableActionBase {
if (fileStoreTable.coreOptions().bucket() ==
BucketMode.POSTPONE_BUCKET) {
buildForPostponeBucketCompaction(env, fileStoreTable, isStreaming);
} else if (fileStoreTable.bucketMode() == BucketMode.BUCKET_UNAWARE) {
- if (fileStoreTable.coreOptions().clusteringIncrementalEnabled()) {
+
+ if (fileStoreTable.coreOptions().dataEvolutionEnabled()) {
+ buildForDataEvolutionTableCompact(env, fileStoreTable,
isStreaming);
+ } else if
(fileStoreTable.coreOptions().clusteringIncrementalEnabled()) {
new IncrementalClusterCompact(
env, fileStoreTable, partitionPredicate,
fullCompaction)
.build();
@@ -205,6 +206,16 @@ public class CompactAction extends TableActionBase {
builder.build();
}
+ protected void buildForDataEvolutionTableCompact(
+ StreamExecutionEnvironment env, FileStoreTable table, boolean
isStreaming)
+ throws Exception {
+ checkArgument(!isStreaming, "Data evolution table compact only
supports batch mode yet.");
+ DataEvolutionTableCompact builder =
+ new DataEvolutionTableCompact(env, identifier.getFullName(),
table);
+ builder.withPartitionPredicate(getPartitionPredicate());
+ builder.build();
+ }
+
protected PartitionPredicate getPartitionPredicate() throws Exception {
checkArgument(
partitions == null || whereSql == null,
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/DataEvolutionTableCompact.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/DataEvolutionTableCompact.java
new file mode 100644
index 0000000000..802f3ffd87
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/DataEvolutionTableCompact.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.compact;
+
+import org.apache.paimon.append.dataevolution.DataEvolutionCompactTask;
+import org.apache.paimon.flink.FlinkConnectorOptions;
+import org.apache.paimon.flink.sink.DataEvolutionTableCompactSink;
+import org.apache.paimon.flink.source.DataEvolutionTableCompactSource;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.partition.PartitionPredicate;
+import org.apache.paimon.table.FileStoreTable;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.transformations.PartitionTransformation;
+import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
+
+import javax.annotation.Nullable;
+
+/** Build for data-evolution table flink compaction job. */
+public class DataEvolutionTableCompact {
+
+ private final transient StreamExecutionEnvironment env;
+ private final String tableIdentifier;
+ private final FileStoreTable table;
+
+ @Nullable private PartitionPredicate partitionPredicate;
+
+ public DataEvolutionTableCompact(
+ StreamExecutionEnvironment env, String tableIdentifier,
FileStoreTable table) {
+ this.env = env;
+ this.tableIdentifier = tableIdentifier;
+ this.table = table;
+ }
+
+ public void withPartitionPredicate(PartitionPredicate partitionPredicate) {
+ this.partitionPredicate = partitionPredicate;
+ }
+
+ public void build() {
+ DataEvolutionTableCompactSource source =
+ new DataEvolutionTableCompactSource(table, partitionPredicate);
+ DataStreamSource<DataEvolutionCompactTask> sourceStream =
+ DataEvolutionTableCompactSource.buildSource(env, source,
tableIdentifier);
+
+ sinkFromSource(sourceStream);
+ }
+
+ private void sinkFromSource(DataStreamSource<DataEvolutionCompactTask>
input) {
+ Options conf = Options.fromMap(table.options());
+ Integer compactionWorkerParallelism =
+
conf.get(FlinkConnectorOptions.UNAWARE_BUCKET_COMPACTION_PARALLELISM);
+ PartitionTransformation<DataEvolutionCompactTask> transformation =
+ new PartitionTransformation<>(
+ input.getTransformation(), new
RebalancePartitioner<>());
+ if (compactionWorkerParallelism != null) {
+ transformation.setParallelism(compactionWorkerParallelism);
+ } else {
+ transformation.setParallelism(env.getParallelism());
+ }
+
+ DataStream<DataEvolutionCompactTask> rebalanced = new
DataStream<>(env, transformation);
+ DataEvolutionTableCompactSink.sink(table, rebalanced);
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DataEvolutionCompactionTaskSimpleSerializer.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DataEvolutionCompactionTaskSimpleSerializer.java
new file mode 100644
index 0000000000..4225581283
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DataEvolutionCompactionTaskSimpleSerializer.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.paimon.append.dataevolution.DataEvolutionCompactTask;
+import
org.apache.paimon.append.dataevolution.DataEvolutionCompactTaskSerializer;
+
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/** {@link SimpleVersionedSerializer} for {@link DataEvolutionCompactTask}. */
+public class DataEvolutionCompactionTaskSimpleSerializer
+ implements SimpleVersionedSerializer<DataEvolutionCompactTask> {
+
+ private final DataEvolutionCompactTaskSerializer compactionTaskSerializer;
+
+ public DataEvolutionCompactionTaskSimpleSerializer(
+ DataEvolutionCompactTaskSerializer compactionTaskSerializer) {
+ this.compactionTaskSerializer = compactionTaskSerializer;
+ }
+
+ @Override
+ public int getVersion() {
+ return 1;
+ }
+
+ @Override
+ public byte[] serialize(DataEvolutionCompactTask compactionTask) throws
IOException {
+ byte[] wrapped = compactionTaskSerializer.serialize(compactionTask);
+ int version = compactionTaskSerializer.getVersion();
+
+ return ByteBuffer.allocate(wrapped.length +
4).put(wrapped).putInt(version).array();
+ }
+
+ @Override
+ public DataEvolutionCompactTask deserialize(int compactionTaskVersion,
byte[] bytes)
+ throws IOException {
+ if (compactionTaskVersion != getVersion()) {
+ throw new RuntimeException("Can not deserialize version: " +
compactionTaskVersion);
+ }
+
+ ByteBuffer buffer = ByteBuffer.wrap(bytes);
+ byte[] wrapped = new byte[bytes.length - 4];
+ buffer.get(wrapped);
+ int version = buffer.getInt();
+ return compactionTaskSerializer.deserialize(version, wrapped);
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DataEvolutionCompactionTaskTypeInfo.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DataEvolutionCompactionTaskTypeInfo.java
new file mode 100644
index 0000000000..75f79ca178
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DataEvolutionCompactionTaskTypeInfo.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.paimon.append.dataevolution.DataEvolutionCompactTask;
+import
org.apache.paimon.append.dataevolution.DataEvolutionCompactTaskSerializer;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.serialization.SerializerConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+/** Type information of {@link DataEvolutionCompactTask}. */
+public class DataEvolutionCompactionTaskTypeInfo extends
TypeInformation<DataEvolutionCompactTask> {
+
+ @Override
+ public boolean isBasicType() {
+ return false;
+ }
+
+ @Override
+ public boolean isTupleType() {
+ return false;
+ }
+
+ @Override
+ public int getArity() {
+ return 1;
+ }
+
+ @Override
+ public int getTotalFields() {
+ return 1;
+ }
+
+ @Override
+ public Class<DataEvolutionCompactTask> getTypeClass() {
+ return DataEvolutionCompactTask.class;
+ }
+
+ @Override
+ public boolean isKeyType() {
+ return false;
+ }
+
+ /**
+ * Do not annotate with <code>@override</code> here to maintain
compatibility with Flink 1.18-.
+ */
+ public TypeSerializer<DataEvolutionCompactTask>
createSerializer(SerializerConfig config) {
+ return this.createSerializer((ExecutionConfig) null);
+ }
+
+ /**
+ * Do not annotate with <code>@override</code> here to maintain
compatibility with Flink 2.0+.
+ */
+ public TypeSerializer<DataEvolutionCompactTask>
createSerializer(ExecutionConfig config) {
+ // we don't need copy for task
+ return new
NoneCopyVersionedSerializerTypeSerializerProxy<DataEvolutionCompactTask>(
+ () ->
+ new DataEvolutionCompactionTaskSimpleSerializer(
+ new DataEvolutionCompactTaskSerializer())) {};
+ }
+
+ @Override
+ public int hashCode() {
+ return 0;
+ }
+
+ @Override
+ public boolean canEqual(Object obj) {
+ return obj instanceof DataEvolutionCompactionTaskTypeInfo;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return obj instanceof DataEvolutionCompactionTaskTypeInfo;
+ }
+
+ @Override
+ public String toString() {
+ return "DataEvolutionCompactionTask";
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DataEvolutionCompactionWorkerOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DataEvolutionCompactionWorkerOperator.java
new file mode 100644
index 0000000000..e5803d9aff
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DataEvolutionCompactionWorkerOperator.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.paimon.append.dataevolution.DataEvolutionCompactTask;
+import org.apache.paimon.flink.source.DataEvolutionTableCompactSource;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.table.FileStoreTable;
+
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Operator to execute {@link DataEvolutionCompactTask} passed from {@link
+ * DataEvolutionTableCompactSource} for compacting single data-evolution table.
+ */
+public class DataEvolutionCompactionWorkerOperator
+ extends PrepareCommitOperator<DataEvolutionCompactTask, Committable> {
+
+ private final FileStoreTable fileStoreTable;
+ private final String commitUser;
+ private final List<Committable> committables;
+
+ private DataEvolutionCompactionWorkerOperator(
+ StreamOperatorParameters<Committable> parameters,
+ FileStoreTable table,
+ String commitUser) {
+ super(parameters, Options.fromMap(table.options()));
+ this.fileStoreTable = table;
+ this.commitUser = commitUser;
+ this.committables = new ArrayList<>();
+ }
+
+ @Override
+ public void processElement(StreamRecord<DataEvolutionCompactTask> element)
throws Exception {
+ DataEvolutionCompactTask task = element.getValue();
+ committables.add(
+ new Committable(
+ Long.MAX_VALUE,
+ Committable.Kind.FILE,
+ task.doCompact(fileStoreTable, commitUser)));
+ }
+
+ @Override
+ protected List<Committable> prepareCommit(boolean waitCompaction, long
checkpointId)
+ throws IOException {
+ List<Committable> toCommit = new ArrayList<>(committables);
+ committables.clear();
+ return toCommit;
+ }
+
+ /** {@link StreamOperatorFactory} of {@link
DataEvolutionCompactionWorkerOperator}. */
+ public static class Factory
+ extends PrepareCommitOperator.Factory<DataEvolutionCompactTask,
Committable> {
+
+ private final FileStoreTable fileStoreTable;
+ private final String commitUser;
+
+ public Factory(FileStoreTable table, String initialCommitUser) {
+ super(Options.fromMap(table.options()));
+ this.fileStoreTable = table;
+ this.commitUser = initialCommitUser;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public <T extends StreamOperator<Committable>> T createStreamOperator(
+ StreamOperatorParameters<Committable> parameters) {
+ return (T)
+ new DataEvolutionCompactionWorkerOperator(
+ parameters, fileStoreTable, commitUser);
+ }
+
+ @Override
+ @SuppressWarnings("rawtypes")
+ public Class<? extends StreamOperator>
getStreamOperatorClass(ClassLoader classLoader) {
+ return DataEvolutionCompactionWorkerOperator.class;
+ }
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DataEvolutionTableCompactSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DataEvolutionTableCompactSink.java
new file mode 100644
index 0000000000..eb021be8dd
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/DataEvolutionTableCompactSink.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.paimon.append.dataevolution.DataEvolutionCompactTask;
+import org.apache.paimon.manifest.ManifestCommittable;
+import org.apache.paimon.table.FileStoreTable;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
+
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/** Compaction Sink for data-evolution table. */
+public class DataEvolutionTableCompactSink extends
FlinkSink<DataEvolutionCompactTask> {
+
+ private final FileStoreTable table;
+
+ public DataEvolutionTableCompactSink(FileStoreTable table) {
+ super(table, true);
+ this.table = table;
+ }
+
+ public static DataStreamSink<?> sink(
+ FileStoreTable table, DataStream<DataEvolutionCompactTask> input) {
+ boolean isStreaming = isStreaming(input);
+ checkArgument(!isStreaming, "Data evolution compaction sink only
supports batch mode yet.");
+ return new DataEvolutionTableCompactSink(table).sinkFrom(input);
+ }
+
+ @Override
+ protected OneInputStreamOperatorFactory<DataEvolutionCompactTask,
Committable>
+ createWriteOperatorFactory(StoreSinkWrite.Provider writeProvider,
String commitUser) {
+ return new DataEvolutionCompactionWorkerOperator.Factory(table,
commitUser);
+ }
+
+ @Override
+ protected Committer.Factory<Committable, ManifestCommittable>
createCommitterFactory() {
+ return context -> new StoreCommitter(table,
table.newCommit(context.commitUser()), context);
+ }
+
+ @Override
+ protected CommittableStateManager<ManifestCommittable>
createCommittableStateManager() {
+ return new NoopCommittableStateManager();
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataEvolutionTableCompactSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataEvolutionTableCompactSource.java
new file mode 100644
index 0000000000..b0f0ca7d15
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataEvolutionTableCompactSource.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.source;
+
+import org.apache.paimon.append.dataevolution.DataEvolutionCompactCoordinator;
+import org.apache.paimon.append.dataevolution.DataEvolutionCompactTask;
+import org.apache.paimon.flink.sink.DataEvolutionCompactionTaskTypeInfo;
+import org.apache.paimon.partition.PartitionPredicate;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.source.EndOfScanException;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+
+/** Source for data-evolution table Compaction. */
+public class DataEvolutionTableCompactSource
+ extends AbstractNonCoordinatedSource<DataEvolutionCompactTask> {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(DataEvolutionTableCompactSource.class);
+ private static final String COMPACTION_COORDINATOR_NAME =
+ "DataEvolution Compaction Coordinator";
+
+ private final FileStoreTable table;
+ private final PartitionPredicate partitionFilter;
+
+ public DataEvolutionTableCompactSource(
+ FileStoreTable table, @Nullable PartitionPredicate
partitionFilter) {
+ this.table = table;
+ this.partitionFilter = partitionFilter;
+ }
+
+ @Override
+ public Boundedness getBoundedness() {
+ return Boundedness.BOUNDED;
+ }
+
+ @Override
+ public SourceReader<DataEvolutionCompactTask, SimpleSourceSplit>
createReader(
+ SourceReaderContext readerContext) throws Exception {
+ Preconditions.checkArgument(
+ readerContext.currentParallelism() == 1,
+ "Compaction Operator parallelism in paimon MUST be one.");
+ return new CompactSourceReader(table, partitionFilter);
+ }
+
+ /** BucketUnawareCompactSourceReader. */
+ public static class CompactSourceReader
+ extends
AbstractNonCoordinatedSourceReader<DataEvolutionCompactTask> {
+ private final DataEvolutionCompactCoordinator compactionCoordinator;
+
+ public CompactSourceReader(FileStoreTable table, PartitionPredicate
partitions) {
+ compactionCoordinator = new DataEvolutionCompactCoordinator(table,
partitions, false);
+ }
+
+ @Override
+ public InputStatus pollNext(ReaderOutput<DataEvolutionCompactTask>
readerOutput)
+ throws Exception {
+ try {
+ // do scan and plan action, emit data-evolution compaction
tasks.
+ List<DataEvolutionCompactTask> tasks =
compactionCoordinator.plan();
+ tasks.forEach(readerOutput::collect);
+ } catch (EndOfScanException esf) {
+ LOG.info("Catching EndOfScanException, the job is finished.");
+ return InputStatus.END_OF_INPUT;
+ }
+
+ return InputStatus.MORE_AVAILABLE;
+ }
+ }
+
+ public static DataStreamSource<DataEvolutionCompactTask> buildSource(
+ StreamExecutionEnvironment env,
+ DataEvolutionTableCompactSource source,
+ String tableIdentifier) {
+ return (DataStreamSource<DataEvolutionCompactTask>)
+ env.fromSource(
+ source,
+ WatermarkStrategy.noWatermarks(),
+ COMPACTION_COORDINATOR_NAME + " : " +
tableIdentifier,
+ new DataEvolutionCompactionTaskTypeInfo())
+ .setParallelism(1)
+ .setMaxParallelism(1);
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
index 95314d5f08..e0777d37b7 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
@@ -18,23 +18,37 @@
package org.apache.paimon.flink.action;
+import org.apache.paimon.AppendOnlyFileStore;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
+import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.operation.BaseAppendFileStoreWrite;
+import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.table.sink.StreamWriteBuilder;
import org.apache.paimon.table.sink.TableCommitImpl;
import org.apache.paimon.table.sink.TableWriteImpl;
import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.StreamTableScan;
import org.apache.paimon.table.source.TableScan;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.CloseableIterator;
+import org.apache.paimon.utils.CommitIncrement;
+import org.apache.paimon.utils.RecordWriter;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.TraceableFileIO;
@@ -56,6 +70,7 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
import static org.apache.paimon.utils.CommonTestUtils.waitUtil;
import static org.assertj.core.api.Assertions.assertThat;
@@ -825,6 +840,110 @@ public class CompactActionITCase extends
CompactActionITCaseBase {
60_000);
}
+ @Test
+ public void testDataEvolutionTableCompact() throws Exception {
+ Map<String, String> tableOptions = new HashMap<>();
+ tableOptions.put(CoreOptions.BUCKET.key(), "-1");
+ tableOptions.put(CoreOptions.DATA_EVOLUTION_ENABLED.key(), "true");
+ tableOptions.put(CoreOptions.ROW_TRACKING_ENABLED.key(), "true");
+ tableOptions.put(CoreOptions.COMPACTION_MIN_FILE_NUM.key(), "2");
+
+ FileStoreTable table =
+ prepareTable(
+ Arrays.asList("k"),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ tableOptions);
+
+ BatchWriteBuilder builder = table.newBatchWriteBuilder();
+ BatchTableWrite write = table.newBatchWriteBuilder().newWrite();
+
+ for (int i = 0; i < 10000; i++) {
+ write.write(rowData(1, i, i, BinaryString.fromString("xxxxoooo" +
i)));
+ }
+
+ builder.newCommit().commit(write.prepareCommit());
+
+ BaseAppendFileStoreWrite storeWrite =
+ ((AppendOnlyFileStore) table.store()).newWrite("test-compact");
+ storeWrite.withWriteType(table.rowType().project(Arrays.asList("v")));
+ RecordWriter<InternalRow> writer =
storeWrite.createWriter(BinaryRow.singleColumn(1), 0);
+ for (int i = 10000; i < 20000; i++) {
+ writer.write(rowData(i));
+ }
+ CommitIncrement commitIncrement = writer.prepareCommit(false);
+ List<CommitMessage> commitMessages =
+ Arrays.asList(
+ new CommitMessageImpl(
+ BinaryRow.singleColumn(1),
+ 0,
+ 1,
+ commitIncrement.newFilesIncrement(),
+ commitIncrement.compactIncrement()));
+ setFirstRowId(commitMessages, 0L);
+ builder.newCommit().commit(commitMessages);
+
+ writer = storeWrite.createWriter(BinaryRow.singleColumn(1), 0);
+ for (int i = 20000; i < 30000; i++) {
+ writer.write(rowData(i));
+ }
+ commitIncrement = writer.prepareCommit(false);
+ commitMessages =
+ Arrays.asList(
+ new CommitMessageImpl(
+ BinaryRow.singleColumn(1),
+ 0,
+ 1,
+ commitIncrement.newFilesIncrement(),
+ commitIncrement.compactIncrement()));
+ setFirstRowId(commitMessages, 0L);
+ builder.newCommit().commit(commitMessages);
+
+ checkLatestSnapshot(table, 3, Snapshot.CommitKind.APPEND);
+
+ runAction(false, true, Collections.emptyList());
+
+ checkLatestSnapshot(table, 4, Snapshot.CommitKind.COMPACT);
+
+ List<DataSplit> splits = table.newSnapshotReader().read().dataSplits();
+ assertThat(splits.size()).isEqualTo(1);
+ List<DataFileMeta> fileMetas = splits.get(0).dataFiles();
+ assertThat(fileMetas.size()).isEqualTo(1);
+ assertThat(fileMetas.get(0).nonNullFirstRowId()).isEqualTo(0);
+ assertThat(fileMetas.get(0).rowCount()).isEqualTo(10000);
+
+ ReadBuilder readBuilder = table.newReadBuilder();
+ RecordReader<InternalRow> reader =
+
readBuilder.newRead().createReader(readBuilder.newScan().plan());
+
+ int value = 20000;
+ try (CloseableIterator<InternalRow> iterator =
reader.toCloseableIterator()) {
+ while (iterator.hasNext()) {
+ InternalRow row = iterator.next();
+ assertThat(row.getInt(1)).isEqualTo(value++);
+ }
+ }
+
+ assertThat(value).isEqualTo(30000);
+ }
+
+ private void setFirstRowId(List<CommitMessage> commitables, long
firstRowId) {
+ commitables.forEach(
+ c -> {
+ CommitMessageImpl commitMessage = (CommitMessageImpl) c;
+ List<DataFileMeta> newFiles =
+ new
ArrayList<>(commitMessage.newFilesIncrement().newFiles());
+ commitMessage.newFilesIncrement().newFiles().clear();
+ commitMessage
+ .newFilesIncrement()
+ .newFiles()
+ .addAll(
+ newFiles.stream()
+ .map(s ->
s.assignFirstRowId(firstRowId))
+ .collect(Collectors.toList()));
+ });
+ }
+
private void runAction(boolean isStreaming) throws Exception {
runAction(isStreaming, false, Collections.emptyList());
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
index d3e10d68f9..75a99fc9b3 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
@@ -490,79 +490,89 @@ public class CompactProcedure extends BaseProcedure {
CommitMessageSerializer messageSerializerser = new
CommitMessageSerializer();
String commitUser =
createCommitUser(table.coreOptions().toConfiguration());
List<CommitMessage> messages = new ArrayList<>();
- while (!(compactionTasks = compactCoordinator.plan()).isEmpty()) {
- if (partitionIdleTime != null) {
- SnapshotReader snapshotReader = table.newSnapshotReader();
- if (partitionPredicate != null) {
- snapshotReader.withPartitionFilter(partitionPredicate);
+ try {
+ while (true) {
+ compactionTasks = compactCoordinator.plan();
+ if (partitionIdleTime != null) {
+ SnapshotReader snapshotReader = table.newSnapshotReader();
+ if (partitionPredicate != null) {
+ snapshotReader.withPartitionFilter(partitionPredicate);
+ }
+ Map<BinaryRow, Long> partitionInfo =
+ snapshotReader.partitionEntries().stream()
+ .collect(
+ Collectors.toMap(
+ PartitionEntry::partition,
+
PartitionEntry::lastFileCreationTime));
+ long historyMilli =
+ LocalDateTime.now()
+ .minus(partitionIdleTime)
+ .atZone(ZoneId.systemDefault())
+ .toInstant()
+ .toEpochMilli();
+ compactionTasks =
+ compactionTasks.stream()
+ .filter(
+ task ->
+
partitionInfo.get(task.partition())
+ <= historyMilli)
+ .collect(Collectors.toList());
}
- Map<BinaryRow, Long> partitionInfo =
- snapshotReader.partitionEntries().stream()
- .collect(
- Collectors.toMap(
- PartitionEntry::partition,
-
PartitionEntry::lastFileCreationTime));
- long historyMilli =
- LocalDateTime.now()
- .minus(partitionIdleTime)
- .atZone(ZoneId.systemDefault())
- .toInstant()
- .toEpochMilli();
- compactionTasks =
- compactionTasks.stream()
- .filter(task ->
partitionInfo.get(task.partition()) <= historyMilli)
- .collect(Collectors.toList());
- }
- if (compactionTasks.isEmpty()) {
- LOG.info("Task plan is empty, no compact job to execute.");
- continue;
- }
-
- DataEvolutionCompactTaskSerializer serializer =
- new DataEvolutionCompactTaskSerializer();
- List<byte[]> serializedTasks = new ArrayList<>();
- try {
- for (DataEvolutionCompactTask compactionTask :
compactionTasks) {
- serializedTasks.add(serializer.serialize(compactionTask));
+ if (compactionTasks.isEmpty()) {
+ LOG.info("Task plan is empty, no compact job to execute.");
+ continue;
}
- } catch (IOException e) {
- throw new RuntimeException("serialize compaction task failed");
- }
- int readParallelism = readParallelism(serializedTasks, spark());
- JavaRDD<byte[]> commitMessageJavaRDD =
- javaSparkContext
- .parallelize(serializedTasks, readParallelism)
- .mapPartitions(
- (FlatMapFunction<Iterator<byte[]>, byte[]>)
- taskIterator -> {
-
DataEvolutionCompactTaskSerializer ser =
- new
DataEvolutionCompactTaskSerializer();
- List<byte[]> messagesBytes =
new ArrayList<>();
- CommitMessageSerializer
messageSer =
- new
CommitMessageSerializer();
- while (taskIterator.hasNext())
{
- DataEvolutionCompactTask
task =
- ser.deserialize(
-
ser.getVersion(),
-
taskIterator.next());
- messagesBytes.add(
-
messageSer.serialize(
-
task.doCompact(table)));
- }
- return
messagesBytes.iterator();
- });
+ DataEvolutionCompactTaskSerializer serializer =
+ new DataEvolutionCompactTaskSerializer();
+ List<byte[]> serializedTasks = new ArrayList<>();
+ try {
+ for (DataEvolutionCompactTask compactionTask :
compactionTasks) {
+
serializedTasks.add(serializer.serialize(compactionTask));
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("serialize compaction task
failed");
+ }
- List<byte[]> serializedMessages = commitMessageJavaRDD.collect();
- try {
- for (byte[] serializedMessage : serializedMessages) {
- messages.add(
- messageSerializerser.deserialize(
- messageSerializerser.getVersion(),
serializedMessage));
+ int readParallelism = readParallelism(serializedTasks,
spark());
+ JavaRDD<byte[]> commitMessageJavaRDD =
+ javaSparkContext
+ .parallelize(serializedTasks, readParallelism)
+ .mapPartitions(
+ (FlatMapFunction<Iterator<byte[]>,
byte[]>)
+ taskIterator -> {
+
DataEvolutionCompactTaskSerializer ser =
+ new
DataEvolutionCompactTaskSerializer();
+ List<byte[]> messagesBytes
= new ArrayList<>();
+ CommitMessageSerializer
messageSer =
+ new
CommitMessageSerializer();
+ while
(taskIterator.hasNext()) {
+
DataEvolutionCompactTask task =
+
ser.deserialize(
+
ser.getVersion(),
+
taskIterator.next());
+ messagesBytes.add(
+
messageSer.serialize(
+
task.doCompact(
+
table,
+
commitUser)));
+ }
+ return
messagesBytes.iterator();
+ });
+
+ List<byte[]> serializedMessages =
commitMessageJavaRDD.collect();
+ try {
+ for (byte[] serializedMessage : serializedMessages) {
+ messages.add(
+ messageSerializerser.deserialize(
+ messageSerializerser.getVersion(),
serializedMessage));
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Deserialize commit message
failed", e);
}
- } catch (Exception e) {
- throw new RuntimeException("Deserialize commit message
failed", e);
}
+ } catch (EndOfScanException e) {
+ LOG.info("Catching EndOfScanException, the compact job is
finishing.");
}
try (TableCommitImpl commit = table.newCommit(commitUser)) {