This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 9b6d43d2bd [flink] support performing incremental clustering by flink
(#6395)
9b6d43d2bd is described below
commit 9b6d43d2bd2eeae54623218be1796690a9a41afa
Author: LsomeYeah <[email protected]>
AuthorDate: Tue Oct 14 16:00:37 2025 +0800
[flink] support performing incremental clustering by flink (#6395)
---
.../content/append-table/incremental-clustering.md | 49 ++-
.../append/cluster/IncrementalClusterManager.java | 3 +-
.../apache/paimon/flink/action/CompactAction.java | 153 +++++++-
.../cluster/IncrementalClusterSplitSource.java | 118 ++++++
.../cluster/RemoveClusterBeforeFilesOperator.java | 58 +++
...writeIncrementalClusterCommittableOperator.java | 112 ++++++
.../action/IncrementalClusterActionITCase.java | 413 +++++++++++++++++++++
.../paimon/spark/procedure/CompactProcedure.java | 2 +-
8 files changed, 896 insertions(+), 12 deletions(-)
diff --git a/docs/content/append-table/incremental-clustering.md
b/docs/content/append-table/incremental-clustering.md
index 72f24ec17e..1cb08cbbdb 100644
--- a/docs/content/append-table/incremental-clustering.md
+++ b/docs/content/append-table/incremental-clustering.md
@@ -95,11 +95,14 @@ clustering and small-file merging must be performed
exclusively via Incremental
## Run Incremental Clustering
{{< hint info >}}
-Currently, only support running Incremental Clustering in spark, support for
flink will be added in the near future.
+only support running Incremental Clustering in batch mode.
{{< /hint >}}
-To run a Incremental Clustering job, follow these instructions.
+To run a Incremental Clustering job, follow these instructions.
+
+You don’t need to specify any clustering-related parameters when running
Incremental Clustering,
+these options are already defined as table options. If you need to change
clustering settings, please update the corresponding table options.
{{< tabs "incremental-clustering" >}}
@@ -117,8 +120,46 @@ CALL sys.compact(table => 'T')
-- run incremental clustering with full mode, this will recluster all data
CALL sys.compact(table => 'T', compact_strategy => 'full')
```
-You don’t need to specify any clustering-related parameters when running
Incremental Clustering,
-these are already defined as table options. If you need to change clustering
settings, please update the corresponding table options.
+{{< /tab >}}
+
+{{< tab "Flink Action" >}}
+
+Run the following command to submit a incremental clustering job for the table.
+
+```bash
+<FLINK_HOME>/bin/flink run \
+ /path/to/paimon-flink-action-{{< version >}}.jar \
+ compact \
+ --warehouse <warehouse-path> \
+ --database <database-name> \
+ --table <table-name> \
+ [--compact_strategy <minor / full>] \
+ [--table_conf <table_conf>] \
+ [--catalog_conf <paimon-catalog-conf> [--catalog_conf
<paimon-catalog-conf> ...]]
+```
+
+Example: run incremental clustering
+
+```bash
+<FLINK_HOME>/bin/flink run \
+ /path/to/paimon-flink-action-{{< version >}}.jar \
+ compact \
+ --warehouse s3:///path/to/warehouse \
+ --database test_db \
+ --table test_table \
+ --table_conf sink.parallelism=2 \
+ --compact_strategy minor \
+ --catalog_conf s3.endpoint=https://****.com \
+ --catalog_conf s3.access-key=***** \
+ --catalog_conf s3.secret-key=*****
+```
+* `--compact_strategy` Determines how to pick files to be cluster, the default
is `minor`.
+ * `full` : All files will be selected for clustered.
+ * `minor` : Pick the set of files that need to be clustered based on
specified conditions.
+
+Note: write parallelism is set by `sink.parallelism`, if too big, may generate
a large number of small files.
+
+You can use `-D execution.runtime-mode=batch` or `-yD
execution.runtime-mode=batch` (for the ON-YARN scenario) to use batch mode.
{{< /tab >}}
{{< /tabs >}}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterManager.java
b/paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterManager.java
index 887150577c..407f6b17ef 100644
---
a/paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterManager.java
+++
b/paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterManager.java
@@ -226,7 +226,8 @@ public class IncrementalClusterManager {
return splits;
}
- public List<DataFileMeta> upgrade(List<DataFileMeta> filesAfterCluster,
int outputLevel) {
+ public static List<DataFileMeta> upgrade(
+ List<DataFileMeta> filesAfterCluster, int outputLevel) {
return filesAfterCluster.stream()
.map(file -> file.upgrade(outputLevel))
.collect(Collectors.toList());
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
index 602d3a59a5..86259582fa 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
@@ -19,9 +19,13 @@
package org.apache.paimon.flink.action;
import org.apache.paimon.CoreOptions;
+import org.apache.paimon.append.cluster.IncrementalClusterManager;
+import org.apache.paimon.compact.CompactUnit;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.FlinkConnectorOptions;
+import org.apache.paimon.flink.cluster.IncrementalClusterSplitSource;
+import
org.apache.paimon.flink.cluster.RewriteIncrementalClusterCommittableOperator;
import org.apache.paimon.flink.compact.AppendTableCompactBuilder;
import org.apache.paimon.flink.postpone.PostponeBucketCompactSplitSource;
import
org.apache.paimon.flink.postpone.RewritePostponeBucketCommittableOperator;
@@ -32,7 +36,10 @@ import org.apache.paimon.flink.sink.CompactorSinkBuilder;
import org.apache.paimon.flink.sink.FixedBucketSink;
import org.apache.paimon.flink.sink.FlinkSinkBuilder;
import org.apache.paimon.flink.sink.FlinkStreamPartitioner;
+import org.apache.paimon.flink.sink.RowAppendTableSink;
import org.apache.paimon.flink.sink.RowDataChannelComputer;
+import org.apache.paimon.flink.sorter.TableSortInfo;
+import org.apache.paimon.flink.sorter.TableSorter;
import org.apache.paimon.flink.source.CompactorSourceBuilder;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.options.Options;
@@ -43,6 +50,7 @@ import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.predicate.PredicateProjectionConverter;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.InternalRowPartitionComputer;
import org.apache.paimon.utils.Pair;
@@ -68,6 +76,7 @@ import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
import static
org.apache.paimon.partition.PartitionPredicate.createBinaryPartitions;
import static
org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
@@ -101,10 +110,6 @@ public class CompactAction extends TableActionBase {
checkArgument(
!((FileStoreTable) table).coreOptions().dataEvolutionEnabled(),
"Compact action does not support data evolution table yet. ");
- checkArgument(
- !(((FileStoreTable) table).bucketMode() ==
BucketMode.BUCKET_UNAWARE
- && ((FileStoreTable)
table).coreOptions().clusteringIncrementalEnabled()),
- "The table has enabled incremental clustering, and do not
support compact in flink yet.");
HashMap<String, String> dynamicOptions = new HashMap<>(tableConf);
dynamicOptions.put(CoreOptions.WRITE_ONLY.key(), "false");
table = table.copy(dynamicOptions);
@@ -148,8 +153,12 @@ public class CompactAction extends TableActionBase {
if (fileStoreTable.coreOptions().bucket() ==
BucketMode.POSTPONE_BUCKET) {
return buildForPostponeBucketCompaction(env, fileStoreTable,
isStreaming);
} else if (fileStoreTable.bucketMode() == BucketMode.BUCKET_UNAWARE) {
- buildForAppendTableCompact(env, fileStoreTable, isStreaming);
- return true;
+ if (fileStoreTable.coreOptions().clusteringIncrementalEnabled()) {
+ return buildForIncrementalClustering(env, fileStoreTable,
isStreaming);
+ } else {
+ buildForAppendTableCompact(env, fileStoreTable, isStreaming);
+ return true;
+ }
} else {
buildForBucketedTableCompact(env, fileStoreTable, isStreaming);
return true;
@@ -203,6 +212,138 @@ public class CompactAction extends TableActionBase {
builder.build();
}
+ private boolean buildForIncrementalClustering(
+ StreamExecutionEnvironment env, FileStoreTable table, boolean
isStreaming) {
+ checkArgument(!isStreaming, "Incremental clustering currently only
supports batch mode");
+ checkArgument(
+ partitions == null,
+ "Incremental clustering currently does not support specifying
partitions");
+ checkArgument(
+ whereSql == null, "Incremental clustering currently does not
support predicates");
+
+ IncrementalClusterManager incrementalClusterManager = new
IncrementalClusterManager(table);
+
+ // non-full strategy as default for incremental clustering
+ if (fullCompaction == null) {
+ fullCompaction = false;
+ }
+ Options options = new Options(table.options());
+ int localSampleMagnification =
table.coreOptions().getLocalSampleMagnification();
+ if (localSampleMagnification < 20) {
+ throw new IllegalArgumentException(
+ String.format(
+ "the config '%s=%d' should not be set too small,
greater than or equal to 20 is needed.",
+
CoreOptions.SORT_COMPACTION_SAMPLE_MAGNIFICATION.key(),
+ localSampleMagnification));
+ }
+ String commitUser = CoreOptions.createCommitUser(options);
+ InternalRowPartitionComputer partitionComputer =
+ new InternalRowPartitionComputer(
+ table.coreOptions().partitionDefaultName(),
+ table.store().partitionType(),
+ table.partitionKeys().toArray(new String[0]),
+ table.coreOptions().legacyPartitionName());
+
+ // 1. pick cluster files for each partition
+ Map<BinaryRow, CompactUnit> compactUnits =
+ incrementalClusterManager.prepareForCluster(fullCompaction);
+ if (compactUnits.isEmpty()) {
+ LOGGER.info(
+ "No partition needs to be incrementally clustered. "
+ + "Please set '--compact_strategy full' if you
need to forcibly trigger the cluster.");
+ if (this.forceStartFlinkJob) {
+ env.fromSequence(0, 0)
+ .name("Nothing to Cluster Source")
+ .sinkTo(new DiscardingSink<>());
+ return true;
+ } else {
+ return false;
+ }
+ }
+ Map<BinaryRow, DataSplit[]> partitionSplits =
+ compactUnits.entrySet().stream()
+ .collect(
+ Collectors.toMap(
+ Map.Entry::getKey,
+ entry ->
+ incrementalClusterManager
+ .toSplits(
+ entry.getKey(),
+
entry.getValue().files())
+ .toArray(new
DataSplit[0])));
+
+ // 2. read,sort and write in partition
+ List<DataStream<Committable>> dataStreams = new ArrayList<>();
+
+ for (Map.Entry<BinaryRow, DataSplit[]> entry :
partitionSplits.entrySet()) {
+ DataSplit[] splits = entry.getValue();
+ LinkedHashMap<String, String> partitionSpec =
+ partitionComputer.generatePartValues(entry.getKey());
+ // 2.1 generate source for current partition
+ Pair<DataStream<RowData>, DataStream<Committable>> sourcePair =
+ IncrementalClusterSplitSource.buildSource(
+ env,
+ table,
+ partitionSpec,
+ splits,
+
options.get(FlinkConnectorOptions.SCAN_PARALLELISM));
+
+ // 2.2 cluster in partition
+ Integer sinkParallelism =
options.get(FlinkConnectorOptions.SINK_PARALLELISM);
+ if (sinkParallelism == null) {
+ sinkParallelism = sourcePair.getLeft().getParallelism();
+ }
+ TableSortInfo sortInfo =
+ new TableSortInfo.Builder()
+
.setSortColumns(incrementalClusterManager.clusterKeys())
+
.setSortStrategy(incrementalClusterManager.clusterCurve())
+ .setSinkParallelism(sinkParallelism)
+ .setLocalSampleSize(sinkParallelism *
localSampleMagnification)
+ .setGlobalSampleSize(sinkParallelism * 1000)
+ .setRangeNumber(sinkParallelism * 10)
+ .build();
+ DataStream<RowData> sorted =
+ TableSorter.getSorter(env, sourcePair.getLeft(), table,
sortInfo).sort();
+
+ // 2.3 write and then reorganize the committable
+ // set parallelism to null, and it'll forward parallelism when
doWrite()
+ RowAppendTableSink sink = new RowAppendTableSink(table, null,
null, null);
+ boolean blobAsDescriptor = table.coreOptions().blobAsDescriptor();
+ DataStream<Committable> clusterCommittable =
+ sink.doWrite(
+ FlinkSinkBuilder.mapToInternalRow(
+ sorted,
+ table.rowType(),
+ blobAsDescriptor,
+
table.catalogEnvironment().catalogContext()),
+ commitUser,
+ null)
+ .transform(
+ "Rewrite cluster committable",
+ new CommittableTypeInfo(),
+ new
RewriteIncrementalClusterCommittableOperator(
+ table,
+ compactUnits.entrySet().stream()
+ .collect(
+ Collectors.toMap(
+
Map.Entry::getKey,
+ unit ->
+
unit.getValue()
+
.outputLevel()))));
+ dataStreams.add(clusterCommittable);
+ dataStreams.add(sourcePair.getRight());
+ }
+
+ // 3. commit
+ RowAppendTableSink sink = new RowAppendTableSink(table, null, null,
null);
+ DataStream<Committable> dataStream = dataStreams.get(0);
+ for (int i = 1; i < dataStreams.size(); i++) {
+ dataStream = dataStream.union(dataStreams.get(i));
+ }
+ sink.doCommit(dataStream, commitUser);
+ return true;
+ }
+
protected PartitionPredicate getPartitionPredicate() throws Exception {
checkArgument(
partitions == null || whereSql == null,
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/IncrementalClusterSplitSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/IncrementalClusterSplitSource.java
new file mode 100644
index 0000000000..eae1d3b8e9
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/IncrementalClusterSplitSource.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.cluster;
+
+import org.apache.paimon.flink.LogicalTypeConversion;
+import org.apache.paimon.flink.sink.Committable;
+import org.apache.paimon.flink.sink.CommittableTypeInfo;
+import org.apache.paimon.flink.source.AbstractNonCoordinatedSource;
+import org.apache.paimon.flink.source.AbstractNonCoordinatedSourceReader;
+import org.apache.paimon.flink.source.SimpleSourceSplit;
+import org.apache.paimon.flink.source.operator.ReadOperator;
+import org.apache.paimon.flink.utils.JavaTypeInfo;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.utils.Pair;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+
+/** Source for Incremental Clustering. */
+public class IncrementalClusterSplitSource extends
AbstractNonCoordinatedSource<Split> {
+ private static final long serialVersionUID = 1L;
+
+ private final Split[] splits;
+
+ public IncrementalClusterSplitSource(Split[] splits) {
+ this.splits = splits;
+ }
+
+ @Override
+ public Boundedness getBoundedness() {
+ return Boundedness.BOUNDED;
+ }
+
+ @Override
+ public SourceReader<Split, SimpleSourceSplit>
createReader(SourceReaderContext readerContext)
+ throws Exception {
+ return new IncrementalClusterSplitSource.Reader();
+ }
+
+ private class Reader extends AbstractNonCoordinatedSourceReader<Split> {
+
+ @Override
+ public InputStatus pollNext(ReaderOutput<Split> output) throws
Exception {
+ for (Split split : splits) {
+ DataSplit dataSplit = (DataSplit) split;
+ output.collect(dataSplit);
+ }
+ return InputStatus.END_OF_INPUT;
+ }
+ }
+
+ public static Pair<DataStream<RowData>, DataStream<Committable>>
buildSource(
+ StreamExecutionEnvironment env,
+ FileStoreTable table,
+ Map<String, String> partitionSpec,
+ DataSplit[] splits,
+ @Nullable Integer parallelism) {
+ DataStreamSource<Split> source =
+ env.fromSource(
+ new IncrementalClusterSplitSource(splits),
+ WatermarkStrategy.noWatermarks(),
+ String.format(
+ "Incremental-cluster split generator: %s - %s",
+ table.fullName(), partitionSpec),
+ new JavaTypeInfo<>(Split.class));
+
+ if (parallelism != null) {
+ source.setParallelism(parallelism);
+ }
+
+ return Pair.of(
+ new DataStream<>(source.getExecutionEnvironment(),
source.getTransformation())
+ .transform(
+ String.format(
+ "Incremental-cluster reader: %s - %s",
+ table.fullName(), partitionSpec),
+ InternalTypeInfo.of(
+
LogicalTypeConversion.toLogicalType(table.rowType())),
+ new ReadOperator(table::newRead, null, null)),
+ source.forward()
+ .transform(
+ "Remove files to be clustered",
+ new CommittableTypeInfo(),
+ new RemoveClusterBeforeFilesOperator())
+ .forceNonParallel());
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/RemoveClusterBeforeFilesOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/RemoveClusterBeforeFilesOperator.java
new file mode 100644
index 0000000000..907a69ba12
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/RemoveClusterBeforeFilesOperator.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.cluster;
+
+import org.apache.paimon.flink.sink.Committable;
+import org.apache.paimon.flink.utils.BoundedOneInputOperator;
+import org.apache.paimon.io.CompactIncrement;
+import org.apache.paimon.io.DataIncrement;
+import org.apache.paimon.table.sink.CommitMessageImpl;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.Split;
+
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.Collections;
+
+/** Operator used with {@link IncrementalClusterSplitSource}, to remove files
to be clustered. */
+public class RemoveClusterBeforeFilesOperator extends
BoundedOneInputOperator<Split, Committable> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void processElement(StreamRecord<Split> element) throws Exception {
+ DataSplit dataSplit = (DataSplit) element.getValue();
+ CommitMessageImpl message =
+ new CommitMessageImpl(
+ dataSplit.partition(),
+ dataSplit.bucket(),
+ dataSplit.totalBuckets(),
+ DataIncrement.emptyIncrement(),
+ new CompactIncrement(
+ dataSplit.dataFiles(),
+ Collections.emptyList(),
+ Collections.emptyList()));
+ output.collect(
+ new StreamRecord<>(
+ new Committable(Long.MAX_VALUE, Committable.Kind.FILE,
message)));
+ }
+
+ @Override
+ public void endInput() throws Exception {}
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/RewriteIncrementalClusterCommittableOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/RewriteIncrementalClusterCommittableOperator.java
new file mode 100644
index 0000000000..6b0ef47f3f
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/RewriteIncrementalClusterCommittableOperator.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.cluster;
+
+import org.apache.paimon.append.cluster.IncrementalClusterManager;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.flink.sink.Committable;
+import org.apache.paimon.flink.utils.BoundedOneInputOperator;
+import org.apache.paimon.io.CompactIncrement;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.DataIncrement;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.CommitMessageImpl;
+
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/** Rewrite committable for new files written after clustered. */
+public class RewriteIncrementalClusterCommittableOperator
+ extends BoundedOneInputOperator<Committable, Committable> {
+ private static final long serialVersionUID = 1L;
+
+ private final FileStoreTable table;
+ private final Map<BinaryRow, Integer> outputLevels;
+
+ private transient Map<BinaryRow, List<DataFileMeta>> partitionFiles;
+
+ public RewriteIncrementalClusterCommittableOperator(
+ FileStoreTable table, Map<BinaryRow, Integer> outputLevels) {
+ this.table = table;
+ this.outputLevels = outputLevels;
+ }
+
+ @Override
+ public void open() throws Exception {
+ partitionFiles = new HashMap<>();
+ }
+
+ @Override
+ public void processElement(StreamRecord<Committable> element) throws
Exception {
+ Committable committable = element.getValue();
+ if (committable.kind() != Committable.Kind.FILE) {
+ output.collect(element);
+ }
+
+ CommitMessageImpl message = (CommitMessageImpl)
committable.wrappedCommittable();
+ checkArgument(message.bucket() == 0);
+ BinaryRow partition = message.partition();
+ partitionFiles
+ .computeIfAbsent(partition, file -> new ArrayList<>())
+ .addAll(message.newFilesIncrement().newFiles());
+ }
+
+ @Override
+ public void endInput() throws Exception {
+ emitAll(Long.MAX_VALUE);
+ }
+
+ protected void emitAll(long checkpointId) {
+ for (Map.Entry<BinaryRow, List<DataFileMeta>> partitionEntry :
partitionFiles.entrySet()) {
+ BinaryRow partition = partitionEntry.getKey();
+ // upgrade the clustered file to outputLevel
+ List<DataFileMeta> clusterAfter =
+ IncrementalClusterManager.upgrade(
+ partitionEntry.getValue(),
outputLevels.get(partition));
+ LOG.info(
+ "Partition {}: upgrade file level to {}",
+ partition,
+ outputLevels.get(partition));
+ CompactIncrement compactIncrement =
+ new CompactIncrement(
+ Collections.emptyList(), clusterAfter,
Collections.emptyList());
+ CommitMessageImpl clusterMessage =
+ new CommitMessageImpl(
+ partition,
+ // bucket 0 is bucket for unaware-bucket table
+ // for compatibility with the old design
+ 0,
+ table.coreOptions().bucket(),
+ DataIncrement.emptyIncrement(),
+ compactIncrement);
+ output.collect(
+ new StreamRecord<>(
+ new Committable(checkpointId,
Committable.Kind.FILE, clusterMessage)));
+ }
+
+ partitionFiles.clear();
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java
new file mode 100644
index 0000000000..29f02adfd5
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java
@@ -0,0 +1,413 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.BatchTableCommit;
+import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.utils.StringUtils;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
+
+/** IT cases for incremental clustering action. */
+public class IncrementalClusterActionITCase extends ActionITCaseBase {
+
+ @Test
+ public void testClusterUnpartitionedTable() throws Exception {
+ FileStoreTable table = createTable(null, 1);
+
+ BinaryString randomStr = BinaryString.fromString(randomString(150));
+ List<CommitMessage> messages = new ArrayList<>();
+
+ // first write
+ for (int i = 0; i < 3; i++) {
+ for (int j = 0; j < 3; j++) {
+ messages.addAll(write(GenericRow.of(i, j, randomStr, 0)));
+ }
+ }
+ commit(messages);
+ ReadBuilder readBuilder = table.newReadBuilder().withProjection(new
int[] {0, 1});
+ List<String> result1 =
+ getResult(
+ readBuilder.newRead(),
+ readBuilder.newScan().plan().splits(),
+ readBuilder.readType());
+ List<String> expected1 =
+ Lists.newArrayList(
+ "+I[0, 0]",
+ "+I[0, 1]",
+ "+I[0, 2]",
+ "+I[1, 0]",
+ "+I[1, 1]",
+ "+I[1, 2]",
+ "+I[2, 0]",
+ "+I[2, 1]",
+ "+I[2, 2]");
+ assertThat(result1).containsExactlyElementsOf(expected1);
+
+ // first cluster
+ runAction(Collections.emptyList());
+ checkSnapshot(table);
+ List<Split> splits = readBuilder.newScan().plan().splits();
+ assertThat(splits.size()).isEqualTo(1);
+ assertThat(((DataSplit)
splits.get(0)).dataFiles().size()).isEqualTo(1);
+ assertThat(((DataSplit)
splits.get(0)).dataFiles().get(0).level()).isEqualTo(5);
+ List<String> result2 = getResult(readBuilder.newRead(), splits,
readBuilder.readType());
+ List<String> expected2 =
+ Lists.newArrayList(
+ "+I[0, 0]",
+ "+I[0, 1]",
+ "+I[1, 0]",
+ "+I[1, 1]",
+ "+I[0, 2]",
+ "+I[1, 2]",
+ "+I[2, 0]",
+ "+I[2, 1]",
+ "+I[2, 2]");
+ assertThat(result2).containsExactlyElementsOf(expected2);
+
+ // second write
+ messages.clear();
+ messages.addAll(
+ write(
+ GenericRow.of(0, 3, null, 0),
+ GenericRow.of(1, 3, null, 0),
+ GenericRow.of(2, 3, null, 0)));
+ messages.addAll(
+ write(
+ GenericRow.of(3, 0, null, 0),
+ GenericRow.of(3, 1, null, 0),
+ GenericRow.of(3, 2, null, 0),
+ GenericRow.of(3, 3, null, 0)));
+ commit(messages);
+
+ List<String> result3 =
+ getResult(
+ readBuilder.newRead(),
+ readBuilder.newScan().plan().splits(),
+ readBuilder.readType());
+ List<String> expected3 = new ArrayList<>(expected2);
+ expected3.addAll(
+ Lists.newArrayList(
+ "+I[0, 3]",
+ "+I[1, 3]",
+ "+I[2, 3]",
+ "+I[3, 0]",
+ "+I[3, 1]",
+ "+I[3, 2]",
+ "+I[3, 3]"));
+ assertThat(result3).containsExactlyElementsOf(expected3);
+
+ // second cluster
+ runAction(Collections.emptyList());
+ checkSnapshot(table);
+ splits = readBuilder.newScan().plan().splits();
+ List<String> result4 = getResult(readBuilder.newRead(), splits,
readBuilder.readType());
+ List<String> expected4 = new ArrayList<>(expected2);
+ expected4.addAll(
+ Lists.newArrayList(
+ "+I[0, 3]",
+ "+I[1, 3]",
+ "+I[3, 0]",
+ "+I[3, 1]",
+ "+I[2, 3]",
+ "+I[3, 2]",
+ "+I[3, 3]"));
+ assertThat(splits.size()).isEqualTo(1);
+ assertThat(((DataSplit)
splits.get(0)).dataFiles().size()).isEqualTo(2);
+ assertThat(((DataSplit)
splits.get(0)).dataFiles().get(0).level()).isEqualTo(5);
+ assertThat(((DataSplit)
splits.get(0)).dataFiles().get(1).level()).isEqualTo(4);
+ assertThat(result4).containsExactlyElementsOf(expected4);
+
+ // full cluster
+ runAction(Lists.newArrayList("--compact_strategy", "full"));
+ checkSnapshot(table);
+ splits = readBuilder.newScan().plan().splits();
+ List<String> result5 = getResult(readBuilder.newRead(), splits,
readBuilder.readType());
+ List<String> expected5 =
+ new ArrayList<>(
+ Lists.newArrayList(
+ "+I[0, 0]",
+ "+I[0, 1]",
+ "+I[1, 0]",
+ "+I[1, 1]",
+ "+I[0, 2]",
+ "+I[0, 3]",
+ "+I[1, 2]",
+ "+I[1, 3]",
+ "+I[2, 0]",
+ "+I[2, 1]",
+ "+I[3, 0]",
+ "+I[3, 1]",
+ "+I[2, 2]",
+ "+I[2, 3]",
+ "+I[3, 2]",
+ "+I[3, 3]"));
+ assertThat(splits.size()).isEqualTo(1);
+ assertThat(((DataSplit)
splits.get(0)).dataFiles().size()).isEqualTo(1);
+ assertThat(((DataSplit)
splits.get(0)).dataFiles().get(0).level()).isEqualTo(5);
+ assertThat(result5).containsExactlyElementsOf(expected5);
+ }
+
+ @Test
+ public void testClusterPartitionedTable() throws Exception {
+ FileStoreTable table = createTable("pt", 1);
+
+ BinaryString randomStr = BinaryString.fromString(randomString(150));
+ List<CommitMessage> messages = new ArrayList<>();
+
+ // first write
+ List<String> expected1 = new ArrayList<>();
+ for (int pt = 0; pt < 2; pt++) {
+ for (int i = 0; i < 3; i++) {
+ for (int j = 0; j < 3; j++) {
+ messages.addAll(write(GenericRow.of(i, j, (pt == 0) ?
randomStr : null, pt)));
+ expected1.add(String.format("+I[%s, %s, %s]", i, j, pt));
+ }
+ }
+ }
+ commit(messages);
+ ReadBuilder readBuilder = table.newReadBuilder().withProjection(new
int[] {0, 1, 3});
+ List<String> result1 =
+ getResult(
+ readBuilder.newRead(),
+ readBuilder.newScan().plan().splits(),
+ readBuilder.readType());
+ assertThat(result1).containsExactlyElementsOf(expected1);
+
+ // first cluster
+ runAction(Collections.emptyList());
+ checkSnapshot(table);
+ List<Split> splits = readBuilder.newScan().plan().splits();
+ assertThat(splits.size()).isEqualTo(2);
+ assertThat(((DataSplit)
splits.get(0)).dataFiles().size()).isEqualTo(1);
+ assertThat(((DataSplit)
splits.get(0)).dataFiles().get(0).level()).isEqualTo(5);
+ List<String> result2 = getResult(readBuilder.newRead(), splits,
readBuilder.readType());
+ List<String> expected2 = new ArrayList<>();
+ for (int pt = 0; pt < 2; pt++) {
+ expected2.add(String.format("+I[0, 0, %s]", pt));
+ expected2.add(String.format("+I[0, 1, %s]", pt));
+ expected2.add(String.format("+I[1, 0, %s]", pt));
+ expected2.add(String.format("+I[1, 1, %s]", pt));
+ expected2.add(String.format("+I[0, 2, %s]", pt));
+ expected2.add(String.format("+I[1, 2, %s]", pt));
+ expected2.add(String.format("+I[2, 0, %s]", pt));
+ expected2.add(String.format("+I[2, 1, %s]", pt));
+ expected2.add(String.format("+I[2, 2, %s]", pt));
+ }
+ assertThat(result2).containsExactlyElementsOf(expected2);
+
+ // second write
+ messages.clear();
+ for (int pt = 0; pt < 2; pt++) {
+ messages.addAll(
+ write(
+ GenericRow.of(0, 3, null, pt),
+ GenericRow.of(1, 3, null, pt),
+ GenericRow.of(2, 3, null, pt)));
+ messages.addAll(
+ write(
+ GenericRow.of(3, 0, null, pt),
+ GenericRow.of(3, 1, null, pt),
+ GenericRow.of(3, 2, null, pt),
+ GenericRow.of(3, 3, null, pt)));
+ }
+ commit(messages);
+
+ List<String> result3 =
+ getResult(
+ readBuilder.newRead(),
+ readBuilder.newScan().plan().splits(),
+ readBuilder.readType());
+ List<String> expected3 = new ArrayList<>();
+ for (int pt = 0; pt < 2; pt++) {
+ expected3.addAll(expected2.subList(9 * pt, 9 * pt + 9));
+ expected3.add(String.format("+I[0, 3, %s]", pt));
+ expected3.add(String.format("+I[1, 3, %s]", pt));
+ expected3.add(String.format("+I[2, 3, %s]", pt));
+ expected3.add(String.format("+I[3, 0, %s]", pt));
+ expected3.add(String.format("+I[3, 1, %s]", pt));
+ expected3.add(String.format("+I[3, 2, %s]", pt));
+ expected3.add(String.format("+I[3, 3, %s]", pt));
+ }
+ assertThat(result3).containsExactlyElementsOf(expected3);
+
+ // second cluster
+ runAction(Collections.emptyList());
+ checkSnapshot(table);
+ splits = readBuilder.newScan().plan().splits();
+ List<String> result4 = getResult(readBuilder.newRead(), splits,
readBuilder.readType());
+ List<String> expected4 = new ArrayList<>();
+ // for partition-0: only file in level-0 will be picked for
clustering, outputLevel is 4
+ expected4.add("+I[0, 0, 0]");
+ expected4.add("+I[0, 1, 0]");
+ expected4.add("+I[1, 0, 0]");
+ expected4.add("+I[1, 1, 0]");
+ expected4.add("+I[0, 2, 0]");
+ expected4.add("+I[1, 2, 0]");
+ expected4.add("+I[2, 0, 0]");
+ expected4.add("+I[2, 1, 0]");
+ expected4.add("+I[2, 2, 0]");
+ expected4.add("+I[0, 3, 0]");
+ expected4.add("+I[1, 3, 0]");
+ expected4.add("+I[3, 0, 0]");
+ expected4.add("+I[3, 1, 0]");
+ expected4.add("+I[2, 3, 0]");
+ expected4.add("+I[3, 2, 0]");
+ expected4.add("+I[3, 3, 0]");
+ // for partition-1:all files will be picked for clustering,
outputLevel is 5
+ expected4.add("+I[0, 0, 1]");
+ expected4.add("+I[0, 1, 1]");
+ expected4.add("+I[1, 0, 1]");
+ expected4.add("+I[1, 1, 1]");
+ expected4.add("+I[0, 2, 1]");
+ expected4.add("+I[0, 3, 1]");
+ expected4.add("+I[1, 2, 1]");
+ expected4.add("+I[1, 3, 1]");
+ expected4.add("+I[2, 0, 1]");
+ expected4.add("+I[2, 1, 1]");
+ expected4.add("+I[3, 0, 1]");
+ expected4.add("+I[3, 1, 1]");
+ expected4.add("+I[2, 2, 1]");
+ expected4.add("+I[2, 3, 1]");
+ expected4.add("+I[3, 2, 1]");
+ expected4.add("+I[3, 3, 1]");
+ assertThat(splits.size()).isEqualTo(2);
+ assertThat(((DataSplit)
splits.get(0)).dataFiles().size()).isEqualTo(2);
+ assertThat(((DataSplit)
splits.get(1)).dataFiles().size()).isEqualTo(1);
+ assertThat(((DataSplit)
splits.get(0)).dataFiles().get(0).level()).isEqualTo(5);
+ assertThat(((DataSplit)
splits.get(0)).dataFiles().get(1).level()).isEqualTo(4);
+ assertThat(((DataSplit)
splits.get(1)).dataFiles().get(0).level()).isEqualTo(5);
+ assertThat(result4).containsExactlyElementsOf(expected4);
+ }
+
+ @Test
+ public void testClusterOnEmptyData() throws Exception {
+ createTable("pt", 1);
+ assertThatCode(() ->
runAction(Collections.emptyList())).doesNotThrowAnyException();
+ }
+
+ protected FileStoreTable createTable(String partitionKeys, int
sinkParallelism)
+ throws Exception {
+ catalog.createDatabase(database, true);
+ catalog.createTable(identifier(), schema(partitionKeys,
sinkParallelism), true);
+ return (FileStoreTable) catalog.getTable(identifier());
+ }
+
+ private FileStoreTable getTable() throws Exception {
+ return (FileStoreTable) catalog.getTable(identifier());
+ }
+
+ private Identifier identifier() {
+ return Identifier.create(database, tableName);
+ }
+
+ private List<CommitMessage> write(GenericRow... data) throws Exception {
+ BatchWriteBuilder builder = getTable().newBatchWriteBuilder();
+ try (BatchTableWrite batchTableWrite = builder.newWrite()) {
+ for (GenericRow row : data) {
+ batchTableWrite.write(row);
+ }
+ return batchTableWrite.prepareCommit();
+ }
+ }
+
+ private void commit(List<CommitMessage> messages) throws Exception {
+ BatchTableCommit commit =
getTable().newBatchWriteBuilder().newCommit();
+ commit.commit(messages);
+ commit.close();
+ }
+
+ private static Schema schema(String partitionKeys, int sinkParallelism) {
+ Schema.Builder schemaBuilder = Schema.newBuilder();
+ schemaBuilder.column("a", DataTypes.INT());
+ schemaBuilder.column("b", DataTypes.INT());
+ schemaBuilder.column("c", DataTypes.STRING());
+ schemaBuilder.column("pt", DataTypes.INT());
+ schemaBuilder.option("bucket", "-1");
+ schemaBuilder.option("num-levels", "6");
+ schemaBuilder.option("num-sorted-run.compaction-trigger", "2");
+ schemaBuilder.option("scan.plan-sort-partition", "true");
+ schemaBuilder.option("clustering.columns", "a,b");
+ schemaBuilder.option("clustering.strategy", "zorder");
+ schemaBuilder.option("clustering.incremental", "true");
+ schemaBuilder.option("scan.parallelism", "1");
+ schemaBuilder.option("sink.parallelism",
String.valueOf(sinkParallelism));
+ if (!StringUtils.isNullOrWhitespaceOnly(partitionKeys)) {
+ schemaBuilder.partitionKeys(partitionKeys);
+ }
+ return schemaBuilder.build();
+ }
+
+ private static String randomString(int length) {
+ String chars =
"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789";
+ StringBuilder sb = new StringBuilder(length);
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+ for (int i = 0; i < length; i++) {
+ sb.append(chars.charAt(random.nextInt(chars.length())));
+ }
+ return sb.toString();
+ }
+
+ private void checkSnapshot(FileStoreTable table) {
+ assertThat(table.latestSnapshot().get().commitKind())
+ .isEqualTo(Snapshot.CommitKind.COMPACT);
+ }
+
+ private void runAction(List<String> extra) throws Exception {
+ StreamExecutionEnvironment env =
streamExecutionEnvironmentBuilder().batchMode().build();
+ ArrayList<String> baseArgs =
+ Lists.newArrayList("compact", "--database", database,
"--table", tableName);
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+ if (random.nextBoolean()) {
+ baseArgs.addAll(Lists.newArrayList("--warehouse", warehouse));
+ } else {
+ baseArgs.addAll(Lists.newArrayList("--catalog_conf", "warehouse="
+ warehouse));
+ }
+ baseArgs.addAll(extra);
+
+ CompactAction action = createAction(CompactAction.class,
baseArgs.toArray(new String[0]));
+ // action.withStreamExecutionEnvironment(env).build();
+ // env.execute();
+ action.withStreamExecutionEnvironment(env);
+ action.run();
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
index 2796b6deb6..3e7608d1a9 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
@@ -620,7 +620,7 @@ public class CompactProcedure extends BaseProcedure {
List<DataFileMeta> clusterBefore =
compactUnits.get(partition).files();
// upgrade the clustered file to outputLevel
List<DataFileMeta> clusterAfter =
- incrementalClusterManager.upgrade(
+ IncrementalClusterManager.upgrade(
entry.getValue(),
compactUnits.get(partition).outputLevel());
LOG.info(
"Partition {}: upgrade file level to {}",