This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 9b6d43d2bd [flink] support performing incremental clustering by flink 
(#6395)
9b6d43d2bd is described below

commit 9b6d43d2bd2eeae54623218be1796690a9a41afa
Author: LsomeYeah <[email protected]>
AuthorDate: Tue Oct 14 16:00:37 2025 +0800

    [flink] support performing incremental clustering by flink (#6395)
---
 .../content/append-table/incremental-clustering.md |  49 ++-
 .../append/cluster/IncrementalClusterManager.java  |   3 +-
 .../apache/paimon/flink/action/CompactAction.java  | 153 +++++++-
 .../cluster/IncrementalClusterSplitSource.java     | 118 ++++++
 .../cluster/RemoveClusterBeforeFilesOperator.java  |  58 +++
 ...writeIncrementalClusterCommittableOperator.java | 112 ++++++
 .../action/IncrementalClusterActionITCase.java     | 413 +++++++++++++++++++++
 .../paimon/spark/procedure/CompactProcedure.java   |   2 +-
 8 files changed, 896 insertions(+), 12 deletions(-)

diff --git a/docs/content/append-table/incremental-clustering.md 
b/docs/content/append-table/incremental-clustering.md
index 72f24ec17e..1cb08cbbdb 100644
--- a/docs/content/append-table/incremental-clustering.md
+++ b/docs/content/append-table/incremental-clustering.md
@@ -95,11 +95,14 @@ clustering and small-file merging must be performed 
exclusively via Incremental
 ## Run Incremental Clustering
 {{< hint info >}}
 
-Currently, only support running Incremental Clustering in spark, support for 
flink will be added in the near future.
+only support running Incremental Clustering in batch mode.
 
 {{< /hint >}}
 
-To run a Incremental Clustering job, follow these instructions.
+To run a Incremental Clustering job, follow these instructions. 
+
+You don’t need to specify any clustering-related parameters when running 
Incremental Clustering,
+these options are already defined as table options. If you need to change 
clustering settings, please update the corresponding table options.
 
 {{< tabs "incremental-clustering" >}}
 
@@ -117,8 +120,46 @@ CALL sys.compact(table => 'T')
 -- run incremental clustering with full mode, this will recluster all data
 CALL sys.compact(table => 'T', compact_strategy => 'full')
 ```
-You don’t need to specify any clustering-related parameters when running 
Incremental Clustering, 
-these are already defined as table options. If you need to change clustering 
settings, please update the corresponding table options.
+{{< /tab >}}
+
+{{< tab "Flink Action" >}}
+
+Run the following command to submit a incremental clustering job for the table.
+
+```bash
+<FLINK_HOME>/bin/flink run \
+    /path/to/paimon-flink-action-{{< version >}}.jar \
+    compact \
+    --warehouse <warehouse-path> \
+    --database <database-name> \
+    --table <table-name> \
+    [--compact_strategy <minor / full>] \
+    [--table_conf <table_conf>] \
+    [--catalog_conf <paimon-catalog-conf> [--catalog_conf 
<paimon-catalog-conf> ...]]
+```
+
+Example: run incremental clustering
+
+```bash
+<FLINK_HOME>/bin/flink run \
+    /path/to/paimon-flink-action-{{< version >}}.jar \
+    compact \
+    --warehouse s3:///path/to/warehouse \
+    --database test_db \
+    --table test_table \
+    --table_conf sink.parallelism=2 \
+    --compact_strategy minor \
+    --catalog_conf s3.endpoint=https://****.com \
+    --catalog_conf s3.access-key=***** \
+    --catalog_conf s3.secret-key=*****
+```
+* `--compact_strategy` Determines how to pick files to be cluster, the default 
is `minor`.
+    * `full` : All files will be selected for clustered.
+    * `minor` : Pick the set of files that need to be clustered based on 
specified conditions.
+
+Note: write parallelism is set by `sink.parallelism`, if too big, may generate 
a large number of small files.
+
+You can use `-D execution.runtime-mode=batch` or `-yD 
execution.runtime-mode=batch` (for the ON-YARN scenario) to use batch mode.
 {{< /tab >}}
 
 {{< /tabs >}}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterManager.java
 
b/paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterManager.java
index 887150577c..407f6b17ef 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterManager.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterManager.java
@@ -226,7 +226,8 @@ public class IncrementalClusterManager {
         return splits;
     }
 
-    public List<DataFileMeta> upgrade(List<DataFileMeta> filesAfterCluster, 
int outputLevel) {
+    public static List<DataFileMeta> upgrade(
+            List<DataFileMeta> filesAfterCluster, int outputLevel) {
         return filesAfterCluster.stream()
                 .map(file -> file.upgrade(outputLevel))
                 .collect(Collectors.toList());
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
index 602d3a59a5..86259582fa 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
@@ -19,9 +19,13 @@
 package org.apache.paimon.flink.action;
 
 import org.apache.paimon.CoreOptions;
+import org.apache.paimon.append.cluster.IncrementalClusterManager;
+import org.apache.paimon.compact.CompactUnit;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.flink.FlinkConnectorOptions;
+import org.apache.paimon.flink.cluster.IncrementalClusterSplitSource;
+import 
org.apache.paimon.flink.cluster.RewriteIncrementalClusterCommittableOperator;
 import org.apache.paimon.flink.compact.AppendTableCompactBuilder;
 import org.apache.paimon.flink.postpone.PostponeBucketCompactSplitSource;
 import 
org.apache.paimon.flink.postpone.RewritePostponeBucketCommittableOperator;
@@ -32,7 +36,10 @@ import org.apache.paimon.flink.sink.CompactorSinkBuilder;
 import org.apache.paimon.flink.sink.FixedBucketSink;
 import org.apache.paimon.flink.sink.FlinkSinkBuilder;
 import org.apache.paimon.flink.sink.FlinkStreamPartitioner;
+import org.apache.paimon.flink.sink.RowAppendTableSink;
 import org.apache.paimon.flink.sink.RowDataChannelComputer;
+import org.apache.paimon.flink.sorter.TableSortInfo;
+import org.apache.paimon.flink.sorter.TableSorter;
 import org.apache.paimon.flink.source.CompactorSourceBuilder;
 import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.options.Options;
@@ -43,6 +50,7 @@ import org.apache.paimon.predicate.PredicateBuilder;
 import org.apache.paimon.predicate.PredicateProjectionConverter;
 import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.InternalRowPartitionComputer;
 import org.apache.paimon.utils.Pair;
@@ -68,6 +76,7 @@ import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 import static 
org.apache.paimon.partition.PartitionPredicate.createBinaryPartitions;
 import static 
org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
@@ -101,10 +110,6 @@ public class CompactAction extends TableActionBase {
         checkArgument(
                 !((FileStoreTable) table).coreOptions().dataEvolutionEnabled(),
                 "Compact action does not support data evolution table yet. ");
-        checkArgument(
-                !(((FileStoreTable) table).bucketMode() == 
BucketMode.BUCKET_UNAWARE
-                        && ((FileStoreTable) 
table).coreOptions().clusteringIncrementalEnabled()),
-                "The table has enabled incremental clustering, and do not 
support compact in flink yet.");
         HashMap<String, String> dynamicOptions = new HashMap<>(tableConf);
         dynamicOptions.put(CoreOptions.WRITE_ONLY.key(), "false");
         table = table.copy(dynamicOptions);
@@ -148,8 +153,12 @@ public class CompactAction extends TableActionBase {
         if (fileStoreTable.coreOptions().bucket() == 
BucketMode.POSTPONE_BUCKET) {
             return buildForPostponeBucketCompaction(env, fileStoreTable, 
isStreaming);
         } else if (fileStoreTable.bucketMode() == BucketMode.BUCKET_UNAWARE) {
-            buildForAppendTableCompact(env, fileStoreTable, isStreaming);
-            return true;
+            if (fileStoreTable.coreOptions().clusteringIncrementalEnabled()) {
+                return buildForIncrementalClustering(env, fileStoreTable, 
isStreaming);
+            } else {
+                buildForAppendTableCompact(env, fileStoreTable, isStreaming);
+                return true;
+            }
         } else {
             buildForBucketedTableCompact(env, fileStoreTable, isStreaming);
             return true;
@@ -203,6 +212,138 @@ public class CompactAction extends TableActionBase {
         builder.build();
     }
 
+    private boolean buildForIncrementalClustering(
+            StreamExecutionEnvironment env, FileStoreTable table, boolean 
isStreaming) {
+        checkArgument(!isStreaming, "Incremental clustering currently only 
supports batch mode");
+        checkArgument(
+                partitions == null,
+                "Incremental clustering currently does not support specifying 
partitions");
+        checkArgument(
+                whereSql == null, "Incremental clustering currently does not 
support predicates");
+
+        IncrementalClusterManager incrementalClusterManager = new 
IncrementalClusterManager(table);
+
+        // non-full strategy as default for incremental clustering
+        if (fullCompaction == null) {
+            fullCompaction = false;
+        }
+        Options options = new Options(table.options());
+        int localSampleMagnification = 
table.coreOptions().getLocalSampleMagnification();
+        if (localSampleMagnification < 20) {
+            throw new IllegalArgumentException(
+                    String.format(
+                            "the config '%s=%d' should not be set too small, 
greater than or equal to 20 is needed.",
+                            
CoreOptions.SORT_COMPACTION_SAMPLE_MAGNIFICATION.key(),
+                            localSampleMagnification));
+        }
+        String commitUser = CoreOptions.createCommitUser(options);
+        InternalRowPartitionComputer partitionComputer =
+                new InternalRowPartitionComputer(
+                        table.coreOptions().partitionDefaultName(),
+                        table.store().partitionType(),
+                        table.partitionKeys().toArray(new String[0]),
+                        table.coreOptions().legacyPartitionName());
+
+        // 1. pick cluster files for each partition
+        Map<BinaryRow, CompactUnit> compactUnits =
+                incrementalClusterManager.prepareForCluster(fullCompaction);
+        if (compactUnits.isEmpty()) {
+            LOGGER.info(
+                    "No partition needs to be incrementally clustered. "
+                            + "Please set '--compact_strategy full' if you 
need to forcibly trigger the cluster.");
+            if (this.forceStartFlinkJob) {
+                env.fromSequence(0, 0)
+                        .name("Nothing to Cluster Source")
+                        .sinkTo(new DiscardingSink<>());
+                return true;
+            } else {
+                return false;
+            }
+        }
+        Map<BinaryRow, DataSplit[]> partitionSplits =
+                compactUnits.entrySet().stream()
+                        .collect(
+                                Collectors.toMap(
+                                        Map.Entry::getKey,
+                                        entry ->
+                                                incrementalClusterManager
+                                                        .toSplits(
+                                                                entry.getKey(),
+                                                                
entry.getValue().files())
+                                                        .toArray(new 
DataSplit[0])));
+
+        // 2. read,sort and write in partition
+        List<DataStream<Committable>> dataStreams = new ArrayList<>();
+
+        for (Map.Entry<BinaryRow, DataSplit[]> entry : 
partitionSplits.entrySet()) {
+            DataSplit[] splits = entry.getValue();
+            LinkedHashMap<String, String> partitionSpec =
+                    partitionComputer.generatePartValues(entry.getKey());
+            // 2.1 generate source for current partition
+            Pair<DataStream<RowData>, DataStream<Committable>> sourcePair =
+                    IncrementalClusterSplitSource.buildSource(
+                            env,
+                            table,
+                            partitionSpec,
+                            splits,
+                            
options.get(FlinkConnectorOptions.SCAN_PARALLELISM));
+
+            // 2.2 cluster in partition
+            Integer sinkParallelism = 
options.get(FlinkConnectorOptions.SINK_PARALLELISM);
+            if (sinkParallelism == null) {
+                sinkParallelism = sourcePair.getLeft().getParallelism();
+            }
+            TableSortInfo sortInfo =
+                    new TableSortInfo.Builder()
+                            
.setSortColumns(incrementalClusterManager.clusterKeys())
+                            
.setSortStrategy(incrementalClusterManager.clusterCurve())
+                            .setSinkParallelism(sinkParallelism)
+                            .setLocalSampleSize(sinkParallelism * 
localSampleMagnification)
+                            .setGlobalSampleSize(sinkParallelism * 1000)
+                            .setRangeNumber(sinkParallelism * 10)
+                            .build();
+            DataStream<RowData> sorted =
+                    TableSorter.getSorter(env, sourcePair.getLeft(), table, 
sortInfo).sort();
+
+            // 2.3 write and then reorganize the committable
+            // set parallelism to null, and it'll forward parallelism when 
doWrite()
+            RowAppendTableSink sink = new RowAppendTableSink(table, null, 
null, null);
+            boolean blobAsDescriptor = table.coreOptions().blobAsDescriptor();
+            DataStream<Committable> clusterCommittable =
+                    sink.doWrite(
+                                    FlinkSinkBuilder.mapToInternalRow(
+                                            sorted,
+                                            table.rowType(),
+                                            blobAsDescriptor,
+                                            
table.catalogEnvironment().catalogContext()),
+                                    commitUser,
+                                    null)
+                            .transform(
+                                    "Rewrite cluster committable",
+                                    new CommittableTypeInfo(),
+                                    new 
RewriteIncrementalClusterCommittableOperator(
+                                            table,
+                                            compactUnits.entrySet().stream()
+                                                    .collect(
+                                                            Collectors.toMap(
+                                                                    
Map.Entry::getKey,
+                                                                    unit ->
+                                                                            
unit.getValue()
+                                                                               
     .outputLevel()))));
+            dataStreams.add(clusterCommittable);
+            dataStreams.add(sourcePair.getRight());
+        }
+
+        // 3. commit
+        RowAppendTableSink sink = new RowAppendTableSink(table, null, null, 
null);
+        DataStream<Committable> dataStream = dataStreams.get(0);
+        for (int i = 1; i < dataStreams.size(); i++) {
+            dataStream = dataStream.union(dataStreams.get(i));
+        }
+        sink.doCommit(dataStream, commitUser);
+        return true;
+    }
+
     protected PartitionPredicate getPartitionPredicate() throws Exception {
         checkArgument(
                 partitions == null || whereSql == null,
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/IncrementalClusterSplitSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/IncrementalClusterSplitSource.java
new file mode 100644
index 0000000000..eae1d3b8e9
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/IncrementalClusterSplitSource.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.cluster;
+
+import org.apache.paimon.flink.LogicalTypeConversion;
+import org.apache.paimon.flink.sink.Committable;
+import org.apache.paimon.flink.sink.CommittableTypeInfo;
+import org.apache.paimon.flink.source.AbstractNonCoordinatedSource;
+import org.apache.paimon.flink.source.AbstractNonCoordinatedSourceReader;
+import org.apache.paimon.flink.source.SimpleSourceSplit;
+import org.apache.paimon.flink.source.operator.ReadOperator;
+import org.apache.paimon.flink.utils.JavaTypeInfo;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.utils.Pair;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.core.io.InputStatus;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+
+/** Source for Incremental Clustering. */
+public class IncrementalClusterSplitSource extends 
AbstractNonCoordinatedSource<Split> {
+    private static final long serialVersionUID = 1L;
+
+    private final Split[] splits;
+
+    public IncrementalClusterSplitSource(Split[] splits) {
+        this.splits = splits;
+    }
+
+    @Override
+    public Boundedness getBoundedness() {
+        return Boundedness.BOUNDED;
+    }
+
+    @Override
+    public SourceReader<Split, SimpleSourceSplit> 
createReader(SourceReaderContext readerContext)
+            throws Exception {
+        return new IncrementalClusterSplitSource.Reader();
+    }
+
+    private class Reader extends AbstractNonCoordinatedSourceReader<Split> {
+
+        @Override
+        public InputStatus pollNext(ReaderOutput<Split> output) throws 
Exception {
+            for (Split split : splits) {
+                DataSplit dataSplit = (DataSplit) split;
+                output.collect(dataSplit);
+            }
+            return InputStatus.END_OF_INPUT;
+        }
+    }
+
+    public static Pair<DataStream<RowData>, DataStream<Committable>> 
buildSource(
+            StreamExecutionEnvironment env,
+            FileStoreTable table,
+            Map<String, String> partitionSpec,
+            DataSplit[] splits,
+            @Nullable Integer parallelism) {
+        DataStreamSource<Split> source =
+                env.fromSource(
+                        new IncrementalClusterSplitSource(splits),
+                        WatermarkStrategy.noWatermarks(),
+                        String.format(
+                                "Incremental-cluster split generator: %s - %s",
+                                table.fullName(), partitionSpec),
+                        new JavaTypeInfo<>(Split.class));
+
+        if (parallelism != null) {
+            source.setParallelism(parallelism);
+        }
+
+        return Pair.of(
+                new DataStream<>(source.getExecutionEnvironment(), 
source.getTransformation())
+                        .transform(
+                                String.format(
+                                        "Incremental-cluster reader: %s - %s",
+                                        table.fullName(), partitionSpec),
+                                InternalTypeInfo.of(
+                                        
LogicalTypeConversion.toLogicalType(table.rowType())),
+                                new ReadOperator(table::newRead, null, null)),
+                source.forward()
+                        .transform(
+                                "Remove files to be clustered",
+                                new CommittableTypeInfo(),
+                                new RemoveClusterBeforeFilesOperator())
+                        .forceNonParallel());
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/RemoveClusterBeforeFilesOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/RemoveClusterBeforeFilesOperator.java
new file mode 100644
index 0000000000..907a69ba12
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/RemoveClusterBeforeFilesOperator.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.cluster;
+
+import org.apache.paimon.flink.sink.Committable;
+import org.apache.paimon.flink.utils.BoundedOneInputOperator;
+import org.apache.paimon.io.CompactIncrement;
+import org.apache.paimon.io.DataIncrement;
+import org.apache.paimon.table.sink.CommitMessageImpl;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.Split;
+
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.Collections;
+
+/** Operator used with {@link IncrementalClusterSplitSource}, to remove files 
to be clustered. */
+public class RemoveClusterBeforeFilesOperator extends 
BoundedOneInputOperator<Split, Committable> {
+
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public void processElement(StreamRecord<Split> element) throws Exception {
+        DataSplit dataSplit = (DataSplit) element.getValue();
+        CommitMessageImpl message =
+                new CommitMessageImpl(
+                        dataSplit.partition(),
+                        dataSplit.bucket(),
+                        dataSplit.totalBuckets(),
+                        DataIncrement.emptyIncrement(),
+                        new CompactIncrement(
+                                dataSplit.dataFiles(),
+                                Collections.emptyList(),
+                                Collections.emptyList()));
+        output.collect(
+                new StreamRecord<>(
+                        new Committable(Long.MAX_VALUE, Committable.Kind.FILE, 
message)));
+    }
+
+    @Override
+    public void endInput() throws Exception {}
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/RewriteIncrementalClusterCommittableOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/RewriteIncrementalClusterCommittableOperator.java
new file mode 100644
index 0000000000..6b0ef47f3f
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/cluster/RewriteIncrementalClusterCommittableOperator.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.cluster;
+
+import org.apache.paimon.append.cluster.IncrementalClusterManager;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.flink.sink.Committable;
+import org.apache.paimon.flink.utils.BoundedOneInputOperator;
+import org.apache.paimon.io.CompactIncrement;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.DataIncrement;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.CommitMessageImpl;
+
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/** Rewrite committable for new files written after clustered. */
+public class RewriteIncrementalClusterCommittableOperator
+        extends BoundedOneInputOperator<Committable, Committable> {
+    private static final long serialVersionUID = 1L;
+
+    private final FileStoreTable table;
+    private final Map<BinaryRow, Integer> outputLevels;
+
+    private transient Map<BinaryRow, List<DataFileMeta>> partitionFiles;
+
+    public RewriteIncrementalClusterCommittableOperator(
+            FileStoreTable table, Map<BinaryRow, Integer> outputLevels) {
+        this.table = table;
+        this.outputLevels = outputLevels;
+    }
+
+    @Override
+    public void open() throws Exception {
+        partitionFiles = new HashMap<>();
+    }
+
+    @Override
+    public void processElement(StreamRecord<Committable> element) throws 
Exception {
+        Committable committable = element.getValue();
+        if (committable.kind() != Committable.Kind.FILE) {
+            output.collect(element);
+        }
+
+        CommitMessageImpl message = (CommitMessageImpl) 
committable.wrappedCommittable();
+        checkArgument(message.bucket() == 0);
+        BinaryRow partition = message.partition();
+        partitionFiles
+                .computeIfAbsent(partition, file -> new ArrayList<>())
+                .addAll(message.newFilesIncrement().newFiles());
+    }
+
+    @Override
+    public void endInput() throws Exception {
+        emitAll(Long.MAX_VALUE);
+    }
+
+    protected void emitAll(long checkpointId) {
+        for (Map.Entry<BinaryRow, List<DataFileMeta>> partitionEntry : 
partitionFiles.entrySet()) {
+            BinaryRow partition = partitionEntry.getKey();
+            // upgrade the clustered file to outputLevel
+            List<DataFileMeta> clusterAfter =
+                    IncrementalClusterManager.upgrade(
+                            partitionEntry.getValue(), 
outputLevels.get(partition));
+            LOG.info(
+                    "Partition {}: upgrade file level to {}",
+                    partition,
+                    outputLevels.get(partition));
+            CompactIncrement compactIncrement =
+                    new CompactIncrement(
+                            Collections.emptyList(), clusterAfter, 
Collections.emptyList());
+            CommitMessageImpl clusterMessage =
+                    new CommitMessageImpl(
+                            partition,
+                            // bucket 0 is bucket for unaware-bucket table
+                            // for compatibility with the old design
+                            0,
+                            table.coreOptions().bucket(),
+                            DataIncrement.emptyIncrement(),
+                            compactIncrement);
+            output.collect(
+                    new StreamRecord<>(
+                            new Committable(checkpointId, 
Committable.Kind.FILE, clusterMessage)));
+        }
+
+        partitionFiles.clear();
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java
new file mode 100644
index 0000000000..29f02adfd5
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java
@@ -0,0 +1,413 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.BatchTableCommit;
+import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.utils.StringUtils;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
+
+/** IT cases for incremental clustering action. */
+public class IncrementalClusterActionITCase extends ActionITCaseBase {
+
+    @Test
+    public void testClusterUnpartitionedTable() throws Exception {
+        FileStoreTable table = createTable(null, 1);
+
+        BinaryString randomStr = BinaryString.fromString(randomString(150));
+        List<CommitMessage> messages = new ArrayList<>();
+
+        // first write
+        for (int i = 0; i < 3; i++) {
+            for (int j = 0; j < 3; j++) {
+                messages.addAll(write(GenericRow.of(i, j, randomStr, 0)));
+            }
+        }
+        commit(messages);
+        ReadBuilder readBuilder = table.newReadBuilder().withProjection(new 
int[] {0, 1});
+        List<String> result1 =
+                getResult(
+                        readBuilder.newRead(),
+                        readBuilder.newScan().plan().splits(),
+                        readBuilder.readType());
+        List<String> expected1 =
+                Lists.newArrayList(
+                        "+I[0, 0]",
+                        "+I[0, 1]",
+                        "+I[0, 2]",
+                        "+I[1, 0]",
+                        "+I[1, 1]",
+                        "+I[1, 2]",
+                        "+I[2, 0]",
+                        "+I[2, 1]",
+                        "+I[2, 2]");
+        assertThat(result1).containsExactlyElementsOf(expected1);
+
+        // first cluster
+        runAction(Collections.emptyList());
+        checkSnapshot(table);
+        List<Split> splits = readBuilder.newScan().plan().splits();
+        assertThat(splits.size()).isEqualTo(1);
+        assertThat(((DataSplit) 
splits.get(0)).dataFiles().size()).isEqualTo(1);
+        assertThat(((DataSplit) 
splits.get(0)).dataFiles().get(0).level()).isEqualTo(5);
+        List<String> result2 = getResult(readBuilder.newRead(), splits, 
readBuilder.readType());
+        List<String> expected2 =
+                Lists.newArrayList(
+                        "+I[0, 0]",
+                        "+I[0, 1]",
+                        "+I[1, 0]",
+                        "+I[1, 1]",
+                        "+I[0, 2]",
+                        "+I[1, 2]",
+                        "+I[2, 0]",
+                        "+I[2, 1]",
+                        "+I[2, 2]");
+        assertThat(result2).containsExactlyElementsOf(expected2);
+
+        // second write
+        messages.clear();
+        messages.addAll(
+                write(
+                        GenericRow.of(0, 3, null, 0),
+                        GenericRow.of(1, 3, null, 0),
+                        GenericRow.of(2, 3, null, 0)));
+        messages.addAll(
+                write(
+                        GenericRow.of(3, 0, null, 0),
+                        GenericRow.of(3, 1, null, 0),
+                        GenericRow.of(3, 2, null, 0),
+                        GenericRow.of(3, 3, null, 0)));
+        commit(messages);
+
+        List<String> result3 =
+                getResult(
+                        readBuilder.newRead(),
+                        readBuilder.newScan().plan().splits(),
+                        readBuilder.readType());
+        List<String> expected3 = new ArrayList<>(expected2);
+        expected3.addAll(
+                Lists.newArrayList(
+                        "+I[0, 3]",
+                        "+I[1, 3]",
+                        "+I[2, 3]",
+                        "+I[3, 0]",
+                        "+I[3, 1]",
+                        "+I[3, 2]",
+                        "+I[3, 3]"));
+        assertThat(result3).containsExactlyElementsOf(expected3);
+
+        // second cluster
+        runAction(Collections.emptyList());
+        checkSnapshot(table);
+        splits = readBuilder.newScan().plan().splits();
+        List<String> result4 = getResult(readBuilder.newRead(), splits, 
readBuilder.readType());
+        List<String> expected4 = new ArrayList<>(expected2);
+        expected4.addAll(
+                Lists.newArrayList(
+                        "+I[0, 3]",
+                        "+I[1, 3]",
+                        "+I[3, 0]",
+                        "+I[3, 1]",
+                        "+I[2, 3]",
+                        "+I[3, 2]",
+                        "+I[3, 3]"));
+        assertThat(splits.size()).isEqualTo(1);
+        assertThat(((DataSplit) 
splits.get(0)).dataFiles().size()).isEqualTo(2);
+        assertThat(((DataSplit) 
splits.get(0)).dataFiles().get(0).level()).isEqualTo(5);
+        assertThat(((DataSplit) 
splits.get(0)).dataFiles().get(1).level()).isEqualTo(4);
+        assertThat(result4).containsExactlyElementsOf(expected4);
+
+        // full cluster
+        runAction(Lists.newArrayList("--compact_strategy", "full"));
+        checkSnapshot(table);
+        splits = readBuilder.newScan().plan().splits();
+        List<String> result5 = getResult(readBuilder.newRead(), splits, 
readBuilder.readType());
+        List<String> expected5 =
+                new ArrayList<>(
+                        Lists.newArrayList(
+                                "+I[0, 0]",
+                                "+I[0, 1]",
+                                "+I[1, 0]",
+                                "+I[1, 1]",
+                                "+I[0, 2]",
+                                "+I[0, 3]",
+                                "+I[1, 2]",
+                                "+I[1, 3]",
+                                "+I[2, 0]",
+                                "+I[2, 1]",
+                                "+I[3, 0]",
+                                "+I[3, 1]",
+                                "+I[2, 2]",
+                                "+I[2, 3]",
+                                "+I[3, 2]",
+                                "+I[3, 3]"));
+        assertThat(splits.size()).isEqualTo(1);
+        assertThat(((DataSplit) 
splits.get(0)).dataFiles().size()).isEqualTo(1);
+        assertThat(((DataSplit) 
splits.get(0)).dataFiles().get(0).level()).isEqualTo(5);
+        assertThat(result5).containsExactlyElementsOf(expected5);
+    }
+
+    @Test
+    public void testClusterPartitionedTable() throws Exception {
+        FileStoreTable table = createTable("pt", 1);
+
+        BinaryString randomStr = BinaryString.fromString(randomString(150));
+        List<CommitMessage> messages = new ArrayList<>();
+
+        // first write
+        List<String> expected1 = new ArrayList<>();
+        for (int pt = 0; pt < 2; pt++) {
+            for (int i = 0; i < 3; i++) {
+                for (int j = 0; j < 3; j++) {
+                    messages.addAll(write(GenericRow.of(i, j, (pt == 0) ? 
randomStr : null, pt)));
+                    expected1.add(String.format("+I[%s, %s, %s]", i, j, pt));
+                }
+            }
+        }
+        commit(messages);
+        ReadBuilder readBuilder = table.newReadBuilder().withProjection(new 
int[] {0, 1, 3});
+        List<String> result1 =
+                getResult(
+                        readBuilder.newRead(),
+                        readBuilder.newScan().plan().splits(),
+                        readBuilder.readType());
+        assertThat(result1).containsExactlyElementsOf(expected1);
+
+        // first cluster
+        runAction(Collections.emptyList());
+        checkSnapshot(table);
+        List<Split> splits = readBuilder.newScan().plan().splits();
+        assertThat(splits.size()).isEqualTo(2);
+        assertThat(((DataSplit) 
splits.get(0)).dataFiles().size()).isEqualTo(1);
+        assertThat(((DataSplit) 
splits.get(0)).dataFiles().get(0).level()).isEqualTo(5);
+        List<String> result2 = getResult(readBuilder.newRead(), splits, 
readBuilder.readType());
+        List<String> expected2 = new ArrayList<>();
+        for (int pt = 0; pt < 2; pt++) {
+            expected2.add(String.format("+I[0, 0, %s]", pt));
+            expected2.add(String.format("+I[0, 1, %s]", pt));
+            expected2.add(String.format("+I[1, 0, %s]", pt));
+            expected2.add(String.format("+I[1, 1, %s]", pt));
+            expected2.add(String.format("+I[0, 2, %s]", pt));
+            expected2.add(String.format("+I[1, 2, %s]", pt));
+            expected2.add(String.format("+I[2, 0, %s]", pt));
+            expected2.add(String.format("+I[2, 1, %s]", pt));
+            expected2.add(String.format("+I[2, 2, %s]", pt));
+        }
+        assertThat(result2).containsExactlyElementsOf(expected2);
+
+        // second write
+        messages.clear();
+        for (int pt = 0; pt < 2; pt++) {
+            messages.addAll(
+                    write(
+                            GenericRow.of(0, 3, null, pt),
+                            GenericRow.of(1, 3, null, pt),
+                            GenericRow.of(2, 3, null, pt)));
+            messages.addAll(
+                    write(
+                            GenericRow.of(3, 0, null, pt),
+                            GenericRow.of(3, 1, null, pt),
+                            GenericRow.of(3, 2, null, pt),
+                            GenericRow.of(3, 3, null, pt)));
+        }
+        commit(messages);
+
+        List<String> result3 =
+                getResult(
+                        readBuilder.newRead(),
+                        readBuilder.newScan().plan().splits(),
+                        readBuilder.readType());
+        List<String> expected3 = new ArrayList<>();
+        for (int pt = 0; pt < 2; pt++) {
+            expected3.addAll(expected2.subList(9 * pt, 9 * pt + 9));
+            expected3.add(String.format("+I[0, 3, %s]", pt));
+            expected3.add(String.format("+I[1, 3, %s]", pt));
+            expected3.add(String.format("+I[2, 3, %s]", pt));
+            expected3.add(String.format("+I[3, 0, %s]", pt));
+            expected3.add(String.format("+I[3, 1, %s]", pt));
+            expected3.add(String.format("+I[3, 2, %s]", pt));
+            expected3.add(String.format("+I[3, 3, %s]", pt));
+        }
+        assertThat(result3).containsExactlyElementsOf(expected3);
+
+        // second cluster
+        runAction(Collections.emptyList());
+        checkSnapshot(table);
+        splits = readBuilder.newScan().plan().splits();
+        List<String> result4 = getResult(readBuilder.newRead(), splits, 
readBuilder.readType());
+        List<String> expected4 = new ArrayList<>();
+        // for partition-0: only file in level-0 will be picked for 
clustering, outputLevel is 4
+        expected4.add("+I[0, 0, 0]");
+        expected4.add("+I[0, 1, 0]");
+        expected4.add("+I[1, 0, 0]");
+        expected4.add("+I[1, 1, 0]");
+        expected4.add("+I[0, 2, 0]");
+        expected4.add("+I[1, 2, 0]");
+        expected4.add("+I[2, 0, 0]");
+        expected4.add("+I[2, 1, 0]");
+        expected4.add("+I[2, 2, 0]");
+        expected4.add("+I[0, 3, 0]");
+        expected4.add("+I[1, 3, 0]");
+        expected4.add("+I[3, 0, 0]");
+        expected4.add("+I[3, 1, 0]");
+        expected4.add("+I[2, 3, 0]");
+        expected4.add("+I[3, 2, 0]");
+        expected4.add("+I[3, 3, 0]");
+        // for partition-1:all files will be picked for clustering, 
outputLevel is 5
+        expected4.add("+I[0, 0, 1]");
+        expected4.add("+I[0, 1, 1]");
+        expected4.add("+I[1, 0, 1]");
+        expected4.add("+I[1, 1, 1]");
+        expected4.add("+I[0, 2, 1]");
+        expected4.add("+I[0, 3, 1]");
+        expected4.add("+I[1, 2, 1]");
+        expected4.add("+I[1, 3, 1]");
+        expected4.add("+I[2, 0, 1]");
+        expected4.add("+I[2, 1, 1]");
+        expected4.add("+I[3, 0, 1]");
+        expected4.add("+I[3, 1, 1]");
+        expected4.add("+I[2, 2, 1]");
+        expected4.add("+I[2, 3, 1]");
+        expected4.add("+I[3, 2, 1]");
+        expected4.add("+I[3, 3, 1]");
+        assertThat(splits.size()).isEqualTo(2);
+        assertThat(((DataSplit) 
splits.get(0)).dataFiles().size()).isEqualTo(2);
+        assertThat(((DataSplit) 
splits.get(1)).dataFiles().size()).isEqualTo(1);
+        assertThat(((DataSplit) 
splits.get(0)).dataFiles().get(0).level()).isEqualTo(5);
+        assertThat(((DataSplit) 
splits.get(0)).dataFiles().get(1).level()).isEqualTo(4);
+        assertThat(((DataSplit) 
splits.get(1)).dataFiles().get(0).level()).isEqualTo(5);
+        assertThat(result4).containsExactlyElementsOf(expected4);
+    }
+
+    @Test
+    public void testClusterOnEmptyData() throws Exception {
+        createTable("pt", 1);
+        assertThatCode(() -> 
runAction(Collections.emptyList())).doesNotThrowAnyException();
+    }
+
+    protected FileStoreTable createTable(String partitionKeys, int 
sinkParallelism)
+            throws Exception {
+        catalog.createDatabase(database, true);
+        catalog.createTable(identifier(), schema(partitionKeys, 
sinkParallelism), true);
+        return (FileStoreTable) catalog.getTable(identifier());
+    }
+
+    private FileStoreTable getTable() throws Exception {
+        return (FileStoreTable) catalog.getTable(identifier());
+    }
+
+    private Identifier identifier() {
+        return Identifier.create(database, tableName);
+    }
+
+    private List<CommitMessage> write(GenericRow... data) throws Exception {
+        BatchWriteBuilder builder = getTable().newBatchWriteBuilder();
+        try (BatchTableWrite batchTableWrite = builder.newWrite()) {
+            for (GenericRow row : data) {
+                batchTableWrite.write(row);
+            }
+            return batchTableWrite.prepareCommit();
+        }
+    }
+
+    private void commit(List<CommitMessage> messages) throws Exception {
+        BatchTableCommit commit = 
getTable().newBatchWriteBuilder().newCommit();
+        commit.commit(messages);
+        commit.close();
+    }
+
+    private static Schema schema(String partitionKeys, int sinkParallelism) {
+        Schema.Builder schemaBuilder = Schema.newBuilder();
+        schemaBuilder.column("a", DataTypes.INT());
+        schemaBuilder.column("b", DataTypes.INT());
+        schemaBuilder.column("c", DataTypes.STRING());
+        schemaBuilder.column("pt", DataTypes.INT());
+        schemaBuilder.option("bucket", "-1");
+        schemaBuilder.option("num-levels", "6");
+        schemaBuilder.option("num-sorted-run.compaction-trigger", "2");
+        schemaBuilder.option("scan.plan-sort-partition", "true");
+        schemaBuilder.option("clustering.columns", "a,b");
+        schemaBuilder.option("clustering.strategy", "zorder");
+        schemaBuilder.option("clustering.incremental", "true");
+        schemaBuilder.option("scan.parallelism", "1");
+        schemaBuilder.option("sink.parallelism", 
String.valueOf(sinkParallelism));
+        if (!StringUtils.isNullOrWhitespaceOnly(partitionKeys)) {
+            schemaBuilder.partitionKeys(partitionKeys);
+        }
+        return schemaBuilder.build();
+    }
+
+    private static String randomString(int length) {
+        String chars = 
"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789";
+        StringBuilder sb = new StringBuilder(length);
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+        for (int i = 0; i < length; i++) {
+            sb.append(chars.charAt(random.nextInt(chars.length())));
+        }
+        return sb.toString();
+    }
+
+    private void checkSnapshot(FileStoreTable table) {
+        assertThat(table.latestSnapshot().get().commitKind())
+                .isEqualTo(Snapshot.CommitKind.COMPACT);
+    }
+
+    private void runAction(List<String> extra) throws Exception {
+        StreamExecutionEnvironment env = 
streamExecutionEnvironmentBuilder().batchMode().build();
+        ArrayList<String> baseArgs =
+                Lists.newArrayList("compact", "--database", database, 
"--table", tableName);
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+        if (random.nextBoolean()) {
+            baseArgs.addAll(Lists.newArrayList("--warehouse", warehouse));
+        } else {
+            baseArgs.addAll(Lists.newArrayList("--catalog_conf", "warehouse=" 
+ warehouse));
+        }
+        baseArgs.addAll(extra);
+
+        CompactAction action = createAction(CompactAction.class, 
baseArgs.toArray(new String[0]));
+        //        action.withStreamExecutionEnvironment(env).build();
+        //        env.execute();
+        action.withStreamExecutionEnvironment(env);
+        action.run();
+    }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
index 2796b6deb6..3e7608d1a9 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
@@ -620,7 +620,7 @@ public class CompactProcedure extends BaseProcedure {
                 List<DataFileMeta> clusterBefore = 
compactUnits.get(partition).files();
                 // upgrade the clustered file to outputLevel
                 List<DataFileMeta> clusterAfter =
-                        incrementalClusterManager.upgrade(
+                        IncrementalClusterManager.upgrade(
                                 entry.getValue(), 
compactUnits.get(partition).outputLevel());
                 LOG.info(
                         "Partition {}: upgrade file level to {}",

Reply via email to