This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new c147031340 [core] Support incremental clustering for append unaware 
table (#6338)
c147031340 is described below

commit c147031340cfac88fe758f818285aba93e251530
Author: LsomeYeah <[email protected]>
AuthorDate: Mon Sep 29 13:06:35 2025 +0800

    [core] Support incremental clustering for append unaware table (#6338)
---
 .../content/append-table/incremental-clustering.md | 134 ++++++++
 .../shortcodes/generated/core_configuration.html   |   6 +
 .../main/java/org/apache/paimon/CoreOptions.java   |  11 +-
 .../append/cluster/IncrementalClusterManager.java  | 242 +++++++++++++++
 .../append/cluster/IncrementalClusterStrategy.java |  89 ++++++
 .../org/apache/paimon/schema/SchemaValidation.java |  15 +
 .../cluster/IncrementalClusterManagerTest.java     | 206 +++++++++++++
 .../cluster/IncrementalClusterStrategyTest.java    | 222 ++++++++++++++
 .../apache/paimon/flink/action/CompactAction.java  |   4 +
 .../apache/paimon/flink/sink/AppendTableSink.java  |   6 +-
 .../paimon/spark/procedure/CompactProcedure.java   | 131 +++++++-
 .../paimon/spark/commands/PaimonSparkWriter.scala  |   2 +-
 .../spark/procedure/CompactProcedureTestBase.scala | 337 ++++++++++++++++++++-
 13 files changed, 1396 insertions(+), 9 deletions(-)

diff --git a/docs/content/append-table/incremental-clustering.md 
b/docs/content/append-table/incremental-clustering.md
new file mode 100644
index 0000000000..72f24ec17e
--- /dev/null
+++ b/docs/content/append-table/incremental-clustering.md
@@ -0,0 +1,134 @@
+---
+title: "Incremental Clustering"
+weight: 4
+type: docs
+aliases:
+- /append-table/incremental-clustering.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Incremental Clustering
+
+Paimon currently supports ordering append tables using SFC (Space-Filling 
Curve)(see [sort compact]({{< ref 
"maintenance/dedicated-compaction#sort-compact" >}}) for more info). 
+The resulting data layout typically delivers better performance for queries 
that target clustering keys. 
+However, with the current SortCompaction, even when neither the data nor the 
clustering keys have changed, 
+each run still rewrites the entire dataset, which is extremely costly. 
+
+To address this, Paimon introduced a more flexible, incremental clustering 
mechanism—Incremental Clustering. 
+On each run, it selects only a specific subset of files to cluster, avoiding a 
full rewrite. This enables low-cost, 
+sort-based optimization of the data layout and improves query performance. In 
addition, with Incremental Clustering, 
+you can adjust clustering keys without rewriting existing data, the layout 
evolves dynamically as cluster runs and 
+gradually converges to an optimal state, significantly reducing the 
decision-making complexity around data layout.
+
+
+Incremental Clustering supports:
+- Support incremental clustering; minimizing write amplification as possible.
+- Support small-file compaction; during rewrites, respect target-file-size.
+- Support changing clustering keys; newly ingested data is clustered according 
to the latest clustering keys.
+- Provide a full mode; when selected, the entire dataset will be reclustered.
+
+**Only append unaware-bucket table supports Incremental Clustering.**
+
+## Enable Incremental Clustering
+
+To enable Incremental Clustering, the following configuration needs to be set 
for the table:
+<table class="table table-bordered">
+    <thead>
+    <tr>
+      <th class="text-left" style="width: 20%">Option</th>
+      <th class="text-left" style="width: 10%">Value</th>
+      <th class="text-left" style="width: 5%">Required</th>
+      <th class="text-left" style="width: 10%">Type</th>
+      <th class="text-left" style="width: 55%">Description</th>
+    </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td><h5>clustering.incremental</h5></td>
+      <td>true</td>
+      <td style="word-wrap: break-word;">Yes</td>
+      <td>Boolean</td>
+      <td>Must be set to true to enable incremental clustering. Default is 
false.</td>
+    </tr>
+    <tr>
+      <td><h5>clustering.columns</h5></td>
+      <td>'clustering-columns'</td>
+      <td style="word-wrap: break-word;">Yes</td>
+      <td>String</td>
+      <td>The clustering columns, in the format 'columnName1,columnName2'. It 
is not recommended to use partition keys as clustering keys.</td>
+    </tr>
+    <tr>
+      <td><h5>clustering.strategy</h5></td>
+      <td>'zorder' or 'hilbert' or 'order'</td>
+      <td style="word-wrap: break-word;">No</td>
+      <td>Boolean</td>
+      <td>The ordering algorithm used for clustering. If not set, It'll 
decided from the number of clustering columns. 'order' is used for 1 column, 
'zorder' for less than 5 columns, and 'hilbert' for 5 or more columns.</td>
+    </tr>
+    </tbody>
+
+</table>
+
+Once Incremental Clustering for a table is enabled, you can run Incremental 
Clustering in batch mode periodically 
+to continuously optimizes data layout of the table and deliver better query 
performance.
+
+**Note**: Since common compaction also rewrites files, it may disrupt the 
ordered data layout built by Incremental Clustering. 
+Therefore, when Incremental Clustering is enabled, the table no longer 
supports write-time compaction or dedicated compaction; 
+clustering and small-file merging must be performed exclusively via 
Incremental Clustering runs.
+
+## Run Incremental Clustering
+{{< hint info >}}
+
+Currently, only support running Incremental Clustering in spark, support for 
flink will be added in the near future.
+
+{{< /hint >}}
+
+To run a Incremental Clustering job, follow these instructions.
+
+{{< tabs "incremental-clustering" >}}
+
+{{< tab "Spark SQL" >}}
+
+Run the following sql:
+
+```sql
+--set the write parallelism, if too big, may generate a large number of small 
files.
+SET spark.sql.shuffle.partitions=10;
+
+-- run incremental clustering
+CALL sys.compact(table => 'T')
+
+-- run incremental clustering with full mode, this will recluster all data
+CALL sys.compact(table => 'T', compact_strategy => 'full')
+```
+You don’t need to specify any clustering-related parameters when running 
Incremental Clustering, 
+these are already defined as table options. If you need to change clustering 
settings, please update the corresponding table options.
+{{< /tab >}}
+
+{{< /tabs >}}
+
+## Implement
+To balance write amplification and sorting effectiveness, Paimon leverages the 
LSM Tree notion of levels to stratify data files 
+and uses the Universal Compaction strategy to select files for clustering.
+- Newly written data lands in level-0; files in level-0 are unclustered.
+- All files in level-i are produced by sorting within the same sorting set.
+- By analogy with Universal Compaction: in level-0, each file is a sorted run; 
in level-i, all files together constitute a single sorted run. During 
clustering, the sorted run is the basic unit of work.
+
+By introducing more levels, we can control the amount of data processed in 
each clustering run. 
+Data at higher levels is more stably clustered and less likely to be 
rewritten, thereby mitigating write amplification while maintaining good 
sorting effectiveness.
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index 10b2b4be41..2eed98adaa 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -146,6 +146,12 @@ under the License.
             <td>String</td>
             <td>Specifies the column name(s) used for comparison during range 
partitioning, in the format 'columnName1,columnName2'. If not set or set to an 
empty string, it indicates that the range partitioning feature is not enabled. 
This option will be effective only for append table without primary keys and 
batch execution mode.</td>
         </tr>
+        <tr>
+            <td><h5>clustering.incremental</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>Whether enable incremental clustering.</td>
+        </tr>
         <tr>
             <td><h5>clustering.strategy</h5></td>
             <td style="word-wrap: break-word;">"auto"</td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index 42293bf5ed..500b16843d 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1933,7 +1933,12 @@ public class CoreOptions implements Serializable {
                                     + "in 'clustering.by-columns'. 'order' is 
used for 1 column, 'zorder' for less than 5 columns, "
                                     + "and 'hilbert' for 5 or more columns.");
 
-    @Immutable
+    public static final ConfigOption<Boolean> CLUSTERING_INCREMENTAL =
+            key("clustering.incremental")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription("Whether enable incremental clustering.");
+
     public static final ConfigOption<Boolean> ROW_TRACKING_ENABLED =
             key("row-tracking.enabled")
                     .booleanType()
@@ -3009,6 +3014,10 @@ public class CoreOptions implements Serializable {
         return clusteringColumns(options.get(CLUSTERING_COLUMNS));
     }
 
+    public boolean clusteringIncrementalEnabled() {
+        return options.get(CLUSTERING_INCREMENTAL);
+    }
+
     public OrderType clusteringStrategy(int columnSize) {
         return clusteringStrategy(options.get(CLUSTERING_STRATEGY), 
columnSize);
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterManager.java
 
b/paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterManager.java
new file mode 100644
index 0000000000..887150577c
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterManager.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.append.cluster;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.compact.CompactUnit;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.mergetree.LevelSortedRun;
+import org.apache.paimon.mergetree.SortedRun;
+import org.apache.paimon.table.BucketMode;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.SplitGenerator;
+import org.apache.paimon.table.source.snapshot.SnapshotReader;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import static org.apache.paimon.CoreOptions.CLUSTERING_INCREMENTAL;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/** Manager for Incremental Clustering. */
+public class IncrementalClusterManager {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(IncrementalClusterManager.class);
+
+    private final SnapshotReader snapshotReader;
+
+    private final IncrementalClusterStrategy incrementalClusterStrategy;
+    private final CoreOptions.OrderType clusterCurve;
+    private final List<String> clusterKeys;
+
+    private int maxLevel;
+
+    public IncrementalClusterManager(FileStoreTable table) {
+        checkArgument(
+                table.bucketMode() == BucketMode.BUCKET_UNAWARE,
+                "only append unaware-bucket table support incremental 
clustering.");
+        // drop stats to reduce memory usage
+        this.snapshotReader = table.newSnapshotReader().dropStats();
+        CoreOptions options = table.coreOptions();
+        checkArgument(
+                options.clusteringIncrementalEnabled(),
+                "Only support incremental clustering when '%s' is true.",
+                CLUSTERING_INCREMENTAL.key());
+        this.incrementalClusterStrategy =
+                new IncrementalClusterStrategy(
+                        table.schemaManager(),
+                        options.clusteringColumns(),
+                        options.maxSizeAmplificationPercent(),
+                        options.sortedRunSizeRatio(),
+                        options.numSortedRunCompactionTrigger());
+        this.clusterCurve = 
options.clusteringStrategy(options.clusteringColumns().size());
+        this.clusterKeys = options.clusteringColumns();
+        this.maxLevel = options.numLevels();
+    }
+
+    public Map<BinaryRow, CompactUnit> prepareForCluster(boolean 
fullCompaction) {
+        // 1. construct LSM structure for each partition
+        Map<BinaryRow, List<LevelSortedRun>> partitionLevels = 
constructLevels();
+        if (LOG.isDebugEnabled()) {
+            partitionLevels.forEach(
+                    (partition, levelSortedRuns) -> {
+                        String runsInfo =
+                                levelSortedRuns.stream()
+                                        .map(
+                                                lsr ->
+                                                        String.format(
+                                                                "level-%s:%s",
+                                                                lsr.level(),
+                                                                
lsr.run().files().size()))
+                                        .collect(Collectors.joining(","));
+                        LOG.debug(
+                                "Partition {} has {} runs: [{}]",
+                                partition,
+                                levelSortedRuns.size(),
+                                runsInfo);
+                    });
+        }
+
+        // 2. pick files to be clustered for each partition
+        Map<BinaryRow, Optional<CompactUnit>> units =
+                partitionLevels.entrySet().stream()
+                        .collect(
+                                Collectors.toMap(
+                                        Map.Entry::getKey,
+                                        entry ->
+                                                
incrementalClusterStrategy.pick(
+                                                        maxLevel,
+                                                        entry.getValue(),
+                                                        fullCompaction)));
+
+        // 3. filter out empty units
+        Map<BinaryRow, CompactUnit> filteredUnits =
+                units.entrySet().stream()
+                        .filter(entry -> entry.getValue().isPresent())
+                        .collect(
+                                Collectors.toMap(
+                                        Map.Entry::getKey, entry -> 
entry.getValue().get()));
+        if (LOG.isDebugEnabled()) {
+            filteredUnits.forEach(
+                    (partition, compactUnit) -> {
+                        String filesInfo =
+                                compactUnit.files().stream()
+                                        .map(
+                                                file ->
+                                                        String.format(
+                                                                "%s,%s,%s",
+                                                                
file.fileName(),
+                                                                file.level(),
+                                                                
file.fileSize()))
+                                        .collect(Collectors.joining(", "));
+                        LOG.debug(
+                                "Partition {}, outputLevel:{}, clustered with 
{} files: [{}]",
+                                partition,
+                                compactUnit.outputLevel(),
+                                compactUnit.files().size(),
+                                filesInfo);
+                    });
+        }
+        return filteredUnits;
+    }
+
+    public Map<BinaryRow, List<LevelSortedRun>> constructLevels() {
+        List<DataSplit> dataSplits = snapshotReader.read().dataSplits();
+
+        maxLevel =
+                Math.max(
+                        maxLevel,
+                        dataSplits.stream()
+                                        .flatMap(split -> 
split.dataFiles().stream())
+                                        .mapToInt(DataFileMeta::level)
+                                        .max()
+                                        .orElse(-1)
+                                + 1);
+        checkArgument(maxLevel > 1, "Number of levels must be at least 2.");
+
+        Map<BinaryRow, List<DataFileMeta>> partitionFiles = new HashMap<>();
+        for (DataSplit dataSplit : dataSplits) {
+            partitionFiles
+                    .computeIfAbsent(dataSplit.partition(), k -> new 
ArrayList<>())
+                    .addAll(dataSplit.dataFiles());
+        }
+
+        return partitionFiles.entrySet().stream()
+                .collect(
+                        Collectors.toMap(
+                                Map.Entry::getKey,
+                                entry -> 
constructPartitionLevels(entry.getValue())));
+    }
+
+    public List<LevelSortedRun> constructPartitionLevels(List<DataFileMeta> 
partitionFiles) {
+        List<LevelSortedRun> partitionLevels = new ArrayList<>();
+        Map<Integer, List<DataFileMeta>> levelMap =
+                
partitionFiles.stream().collect(Collectors.groupingBy(DataFileMeta::level));
+
+        for (Map.Entry<Integer, List<DataFileMeta>> entry : 
levelMap.entrySet()) {
+            int level = entry.getKey();
+            if (level == 0) {
+                for (DataFileMeta level0File : entry.getValue()) {
+                    partitionLevels.add(
+                            new LevelSortedRun(level, 
SortedRun.fromSingle(level0File)));
+                }
+            } else {
+                // don't need to guarantee that the files within the same 
sorted run are
+                // non-overlapping here, so we call SortedRun.fromSorted() to 
avoid sorting and
+                // validation
+                partitionLevels.add(
+                        new LevelSortedRun(level, 
SortedRun.fromSorted(entry.getValue())));
+            }
+        }
+
+        // sort by level
+        partitionLevels.sort(Comparator.comparing(LevelSortedRun::level));
+        return partitionLevels;
+    }
+
+    public List<DataSplit> toSplits(BinaryRow partition, List<DataFileMeta> 
files) {
+        List<DataSplit> splits = new ArrayList<>();
+
+        DataSplit.Builder builder =
+                DataSplit.builder()
+                        .withPartition(partition)
+                        .withBucket(0)
+                        .withTotalBuckets(1)
+                        .isStreaming(false);
+
+        SplitGenerator splitGenerator = snapshotReader.splitGenerator();
+        List<SplitGenerator.SplitGroup> splitGroups = 
splitGenerator.splitForBatch(files);
+
+        for (SplitGenerator.SplitGroup splitGroup : splitGroups) {
+            List<DataFileMeta> dataFiles = splitGroup.files;
+            String bucketPath = 
snapshotReader.pathFactory().bucketPath(partition, 0).toString();
+            builder.withDataFiles(dataFiles)
+                    .rawConvertible(splitGroup.rawConvertible)
+                    .withBucketPath(bucketPath);
+
+            splits.add(builder.build());
+        }
+
+        return splits;
+    }
+
+    public List<DataFileMeta> upgrade(List<DataFileMeta> filesAfterCluster, 
int outputLevel) {
+        return filesAfterCluster.stream()
+                .map(file -> file.upgrade(outputLevel))
+                .collect(Collectors.toList());
+    }
+
+    public CoreOptions.OrderType clusterCurve() {
+        return clusterCurve;
+    }
+
+    public List<String> clusterKeys() {
+        return clusterKeys;
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterStrategy.java
 
b/paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterStrategy.java
new file mode 100644
index 0000000000..1a90c16ce3
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/append/cluster/IncrementalClusterStrategy.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.append.cluster;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.compact.CompactUnit;
+import org.apache.paimon.mergetree.LevelSortedRun;
+import org.apache.paimon.mergetree.compact.UniversalCompaction;
+import org.apache.paimon.schema.SchemaManager;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Optional;
+
+/** Cluster strategy to decide which files to select for cluster. */
+public class IncrementalClusterStrategy {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(IncrementalClusterStrategy.class);
+
+    private final List<String> clusterKeys;
+    private final SchemaManager schemaManager;
+
+    private final UniversalCompaction universalCompaction;
+
+    public IncrementalClusterStrategy(
+            SchemaManager schemaManager,
+            List<String> clusterKeys,
+            int maxSizeAmp,
+            int sizeRatio,
+            int numRunCompactionTrigger) {
+        this.universalCompaction =
+                new UniversalCompaction(maxSizeAmp, sizeRatio, 
numRunCompactionTrigger, null, null);
+        this.clusterKeys = clusterKeys;
+        this.schemaManager = schemaManager;
+    }
+
+    public Optional<CompactUnit> pick(
+            int numLevels, List<LevelSortedRun> runs, boolean fullCompaction) {
+        if (fullCompaction) {
+            return pickFullCompaction(numLevels, runs);
+        }
+        return universalCompaction.pick(numLevels, runs);
+    }
+
+    public Optional<CompactUnit> pickFullCompaction(int numLevels, 
List<LevelSortedRun> runs) {
+        int maxLevel = numLevels - 1;
+        if (runs.isEmpty()) {
+            // no sorted run, no need to compact
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("no sorted run, no need to compact");
+            }
+            return Optional.empty();
+        }
+
+        if (runs.size() == 1 && runs.get(0).level() == maxLevel) {
+            long schemaId = runs.get(0).run().files().get(0).schemaId();
+            CoreOptions coreOptions = 
CoreOptions.fromMap(schemaManager.schema(schemaId).options());
+            // only one sorted run in the maxLevel with the same cluster key
+            if (coreOptions.clusteringColumns().equals(clusterKeys)) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug(
+                            "only one sorted run in the maxLevel with the same 
cluster key, no need to compact");
+                }
+                return Optional.empty();
+            }
+        }
+
+        // full compaction
+        return Optional.of(CompactUnit.fromLevelRuns(maxLevel, runs));
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index 9d73081805..09de891a37 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -239,6 +239,8 @@ public class SchemaValidation {
         validateMergeFunctionFactory(schema);
 
         validateRowTracking(schema, options);
+
+        validateIncrementalClustering(schema, options);
     }
 
     public static void validateFallbackBranch(SchemaManager schemaManager, 
TableSchema schema) {
@@ -648,4 +650,17 @@ public class SchemaValidation {
                     "Data evolution config must disabled with 
deletion-vectors.enabled");
         }
     }
+
+    private static void validateIncrementalClustering(TableSchema schema, 
CoreOptions options) {
+        if (options.clusteringIncrementalEnabled()) {
+            checkArgument(
+                    options.bucket() == -1,
+                    "Cannot define %s for incremental clustering  table, it 
only support bucket = -1",
+                    CoreOptions.BUCKET.key());
+            checkArgument(
+                    schema.primaryKeys().isEmpty(),
+                    "Cannot define %s for incremental clustering table.",
+                    PRIMARY_KEY.key());
+        }
+    }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/append/cluster/IncrementalClusterManagerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/append/cluster/IncrementalClusterManagerTest.java
new file mode 100644
index 0000000000..8b37aff0b8
--- /dev/null
+++ 
b/paimon-core/src/test/java/org/apache/paimon/append/cluster/IncrementalClusterManagerTest.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.append.cluster;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.manifest.FileSource;
+import org.apache.paimon.mergetree.LevelSortedRun;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.FileStoreTableFactory;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for {@link IncrementalClusterManager}. */
+public class IncrementalClusterManagerTest {
+
+    @TempDir java.nio.file.Path tempDir;
+
+    @Test
+    public void testNonUnAwareBucketTable() throws Exception {
+        Map<String, String> options = new HashMap<>();
+        options.put(CoreOptions.BUCKET.key(), "1");
+        options.put(CoreOptions.BUCKET_KEY.key(), "f0");
+
+        assertThatThrownBy(() -> createTable(options, Collections.emptyList()))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining(
+                        "Cannot define bucket for incremental clustering  
table, it only support bucket = -1");
+    }
+
+    @Test
+    public void testNonClusterIncremental() throws Exception {
+        Map<String, String> options = new HashMap<>();
+        options.put(CoreOptions.BUCKET.key(), "-1");
+        options.put(CoreOptions.CLUSTERING_INCREMENTAL.key(), "false");
+        FileStoreTable table = createTable(options, Collections.emptyList());
+        assertThatThrownBy(() -> new IncrementalClusterManager(table))
+                .isInstanceOf(IllegalArgumentException.class)
+                .hasMessageContaining(
+                        "Only support incremental clustering when 
'clustering.incremental' is true.");
+    }
+
+    @Test
+    public void testConstructPartitionLevels() throws Exception {
+        // Create a valid table for IncrementalClusterManager
+        Map<String, String> options = new HashMap<>();
+        FileStoreTable table = createTable(options, Collections.emptyList());
+        IncrementalClusterManager incrementalClusterManager = new 
IncrementalClusterManager(table);
+
+        // Create test files with different levels
+        List<DataFileMeta> partitionFiles = new ArrayList<>();
+
+        // Level 0 files (should be individual LevelSortedRuns)
+        DataFileMeta level0File1 = createFile(100, 1, 0);
+        DataFileMeta level0File2 = createFile(200, 1, 0);
+        partitionFiles.add(level0File1);
+        partitionFiles.add(level0File2);
+
+        // Level 1 files (should be grouped into one LevelSortedRun)
+        DataFileMeta level1File1 = createFile(300, 1, 1);
+        DataFileMeta level1File2 = createFile(400, 1, 1);
+        partitionFiles.add(level1File1);
+        partitionFiles.add(level1File2);
+
+        // Level 2 files (should be grouped into one LevelSortedRun)
+        DataFileMeta level2File1 = createFile(500, 1, 2);
+        partitionFiles.add(level2File1);
+
+        // Call the method under test
+        List<LevelSortedRun> result =
+                
incrementalClusterManager.constructPartitionLevels(partitionFiles);
+
+        // Verify the results
+        assertThat(result).hasSize(4); // 2 level-0 runs + 1 level-1 run + 1 
level-2 run
+
+        // Verify sorting by level
+        assertThat(result.get(0).level()).isEqualTo(0);
+        assertThat(result.get(1).level()).isEqualTo(0);
+        assertThat(result.get(2).level()).isEqualTo(1);
+        assertThat(result.get(3).level()).isEqualTo(2);
+
+        // Verify level 0 files are individual runs
+        assertThat(result.get(0).run().files()).hasSize(1);
+        assertThat(result.get(1).run().files()).hasSize(1);
+
+        // Verify level 1 files are grouped together
+        assertThat(result.get(2).run().files()).hasSize(2);
+        
assertThat(result.get(2).run().files()).containsExactlyInAnyOrder(level1File1, 
level1File2);
+
+        // Verify level 2 file
+        assertThat(result.get(3).run().files()).hasSize(1);
+        assertThat(result.get(3).run().files()).containsExactly(level2File1);
+    }
+
+    @Test
+    public void testUpgrade() throws Exception {
+        // Create a valid table for IncrementalClusterManager
+        Map<String, String> options = new HashMap<>();
+        FileStoreTable table = createTable(options, Collections.emptyList());
+        IncrementalClusterManager incrementalClusterManager = new 
IncrementalClusterManager(table);
+
+        // Create test files with different levels
+        List<DataFileMeta> filesAfterCluster = new ArrayList<>();
+        DataFileMeta file1 = createFile(100, 1, 0);
+        DataFileMeta file2 = createFile(200, 1, 1);
+        DataFileMeta file3 = createFile(300, 1, 2);
+        filesAfterCluster.add(file1);
+        filesAfterCluster.add(file2);
+        filesAfterCluster.add(file3);
+
+        // Test upgrading to level 3
+        int outputLevel = 3;
+        List<DataFileMeta> upgradedFiles =
+                incrementalClusterManager.upgrade(filesAfterCluster, 
outputLevel);
+
+        // Verify the results
+        assertThat(upgradedFiles).hasSize(3);
+
+        // Verify all files are upgraded to the specified output level
+        for (DataFileMeta upgradedFile : upgradedFiles) {
+            assertThat(upgradedFile.level()).isEqualTo(outputLevel);
+        }
+    }
+
+    private FileStoreTable createTable(
+            Map<String, String> customOptions, List<String> partitionKeys) 
throws Exception {
+        Map<String, String> options = new HashMap<>();
+        options.put(CoreOptions.BUCKET.key(), "-1");
+        options.put(CoreOptions.CLUSTERING_COLUMNS.key(), "f0,f1");
+        options.put(CoreOptions.CLUSTERING_INCREMENTAL.key(), "true");
+        options.putAll(customOptions);
+
+        Schema schema =
+                new Schema(
+                        RowType.of(
+                                        DataTypes.INT(),
+                                        DataTypes.INT(),
+                                        DataTypes.STRING(),
+                                        DataTypes.STRING())
+                                .getFields(),
+                        partitionKeys,
+                        Collections.emptyList(),
+                        options,
+                        "");
+
+        SchemaManager schemaManager =
+                new SchemaManager(LocalFileIO.create(), new 
Path(tempDir.toString()));
+        return FileStoreTableFactory.create(
+                LocalFileIO.create(),
+                new Path(tempDir.toString()),
+                schemaManager.createTable(schema));
+    }
+
+    private static DataFileMeta createFile(long size, long schemaId, int 
level) {
+        return DataFileMeta.create(
+                "",
+                size,
+                1,
+                null,
+                null,
+                null,
+                null,
+                0,
+                0,
+                schemaId,
+                level,
+                null,
+                null,
+                FileSource.APPEND,
+                null,
+                null,
+                null);
+    }
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/append/cluster/IncrementalClusterStrategyTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/append/cluster/IncrementalClusterStrategyTest.java
new file mode 100644
index 0000000000..1061c50c9e
--- /dev/null
+++ 
b/paimon-core/src/test/java/org/apache/paimon/append/cluster/IncrementalClusterStrategyTest.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.append.cluster;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.compact.CompactUnit;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.manifest.FileSource;
+import org.apache.paimon.mergetree.LevelSortedRun;
+import org.apache.paimon.mergetree.SortedRun;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaChange;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.VarCharType;
+
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static java.util.Collections.emptyList;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link IncrementalClusterStrategy}. */
+public class IncrementalClusterStrategyTest {
+
+    @TempDir static java.nio.file.Path tempDir;
+
+    private static SchemaManager schemaManager;
+    private static IncrementalClusterStrategy incrementalClusterStrategy;
+
+    @BeforeAll
+    public static void setUp() throws Exception {
+        schemaManager = new SchemaManager(LocalFileIO.create(), new 
Path(tempDir.toString()));
+        prepareSchema();
+        incrementalClusterStrategy =
+                new IncrementalClusterStrategy(schemaManager, 
Arrays.asList("f0", "f1"), 25, 1, 3);
+    }
+
+    @Test
+    public void testPickFullCompactionWithEmptyRuns() {
+        // Test case: empty runs should return empty
+        Optional<CompactUnit> result =
+                incrementalClusterStrategy.pickFullCompaction(3, 
Collections.emptyList());
+        assertThat(result.isPresent()).isFalse();
+    }
+
+    @Test
+    public void testPickFullCompactionWithSingleRunSameClusterKey() {
+        // Test case: single run at max level with same cluster key should 
return empty
+        // Using schema-0 which has clustering columns "f0,f1" (same as 
clusterKeys)
+        int maxLevel = 2;
+        DataFileMeta file = createFile(1, 0L, maxLevel);
+        LevelSortedRun run = new LevelSortedRun(maxLevel, 
SortedRun.fromSingle(file));
+        List<LevelSortedRun> runs = Collections.singletonList(run);
+
+        Optional<CompactUnit> result = 
incrementalClusterStrategy.pickFullCompaction(3, runs);
+        assertThat(result.isPresent()).isFalse();
+    }
+
+    @Test
+    public void testPickFullCompactionWithSingleRunDifferentClusterKey() {
+        // Test case: single run at max level with different cluster key 
should return compaction
+        // Using schema-1 which has clustering columns "f2,f3" (different from 
clusterKeys "f0,f1")
+        int maxLevel = 2;
+        DataFileMeta file = createFile(1, 1L, maxLevel); // Use schema-1
+        LevelSortedRun run = new LevelSortedRun(maxLevel, 
SortedRun.fromSingle(file));
+        List<LevelSortedRun> runs = Collections.singletonList(run);
+
+        Optional<CompactUnit> result = 
incrementalClusterStrategy.pickFullCompaction(3, runs);
+        assertThat(result.isPresent()).isTrue();
+        assertThat(result.get().outputLevel()).isEqualTo(maxLevel);
+        assertThat(result.get().files()).hasSize(1);
+        assertThat(result.get().files().get(0).fileSize()).isEqualTo(1);
+    }
+
+    @Test
+    public void testPickFullCompactionWithSingleRunNotAtMaxLevel() {
+        // Test case: single run not at max level should return compaction
+        int maxLevel = 2;
+        int runLevel = 1;
+        DataFileMeta file = createFile(1, 0L, runLevel);
+        LevelSortedRun run = new LevelSortedRun(runLevel, 
SortedRun.fromSingle(file));
+        List<LevelSortedRun> runs = Collections.singletonList(run);
+
+        Optional<CompactUnit> result = 
incrementalClusterStrategy.pickFullCompaction(3, runs);
+        assertThat(result.isPresent()).isTrue();
+        assertThat(result.get().outputLevel()).isEqualTo(maxLevel);
+        assertThat(result.get().files()).hasSize(1);
+        assertThat(result.get().files().get(0).fileSize()).isEqualTo(1);
+    }
+
+    @Test
+    public void testPickFullCompactionWithMultipleRuns() {
+        // Test case: multiple runs should return compaction
+        int maxLevel = 2;
+        DataFileMeta file1 = createFile(1, 0L, 0);
+        DataFileMeta file2 = createFile(2, 1L, 1);
+        DataFileMeta file3 = createFile(3, 0L, maxLevel);
+
+        LevelSortedRun run1 = new LevelSortedRun(0, 
SortedRun.fromSingle(file1));
+        LevelSortedRun run2 = new LevelSortedRun(1, 
SortedRun.fromSingle(file2));
+        LevelSortedRun run3 = new LevelSortedRun(maxLevel, 
SortedRun.fromSingle(file3));
+
+        List<LevelSortedRun> runs = Arrays.asList(run1, run2, run3);
+
+        Optional<CompactUnit> result = 
incrementalClusterStrategy.pickFullCompaction(3, runs);
+        assertThat(result.isPresent()).isTrue();
+        assertThat(result.get().outputLevel()).isEqualTo(maxLevel);
+        assertThat(result.get().files()).hasSize(3);
+
+        long[] fileSizes =
+                
result.get().files().stream().mapToLong(DataFileMeta::fileSize).toArray();
+        assertThat(fileSizes).isEqualTo(new long[] {1, 2, 3});
+    }
+
+    @Test
+    public void testPickFullCompactionWithDifferentNumLevels() {
+        // Test case: different number of levels
+        DataFileMeta file1 = createFile(1, 0L, 0);
+        DataFileMeta file2 = createFile(2, 1L, 1);
+
+        LevelSortedRun run1 = new LevelSortedRun(0, 
SortedRun.fromSingle(file1));
+        LevelSortedRun run2 = new LevelSortedRun(1, 
SortedRun.fromSingle(file2));
+
+        List<LevelSortedRun> runs = Arrays.asList(run1, run2);
+
+        // Test with numLevels = 5, maxLevel should be 4
+        Optional<CompactUnit> result = 
incrementalClusterStrategy.pickFullCompaction(5, runs);
+        assertThat(result.isPresent()).isTrue();
+        assertThat(result.get().outputLevel()).isEqualTo(4); // maxLevel = 
numLevels - 1
+        assertThat(result.get().files()).hasSize(2);
+    }
+
+    @Test
+    public void testPickFullCompactionWithMixedSchemas() {
+        // Test case: runs with mixed schemas (some same, some different 
cluster keys)
+        int maxLevel = 2;
+        DataFileMeta file1 = createFile(1, 0L, 0); // schema-0: f0,f1 (same as 
clusterKeys)
+        DataFileMeta file2 = createFile(2, 1L, 1); // schema-1: f2,f3 
(different from clusterKeys)
+        DataFileMeta file3 = createFile(3, 0L, maxLevel); // schema-0: f0,f1 
(same as clusterKeys)
+
+        LevelSortedRun run1 = new LevelSortedRun(0, 
SortedRun.fromSingle(file1));
+        LevelSortedRun run2 = new LevelSortedRun(1, 
SortedRun.fromSingle(file2));
+        LevelSortedRun run3 = new LevelSortedRun(maxLevel, 
SortedRun.fromSingle(file3));
+
+        List<LevelSortedRun> runs = Arrays.asList(run1, run2, run3);
+
+        Optional<CompactUnit> result = 
incrementalClusterStrategy.pickFullCompaction(3, runs);
+        assertThat(result.isPresent()).isTrue();
+        assertThat(result.get().outputLevel()).isEqualTo(maxLevel);
+        assertThat(result.get().files()).hasSize(3);
+    }
+
+    private static void prepareSchema() throws Exception {
+        // schema-0
+        Map<String, String> options = new HashMap<>();
+        options.put(CoreOptions.CLUSTERING_COLUMNS.key(), "f0,f1");
+        Schema schema =
+                new Schema(
+                        RowType.of(
+                                        VarCharType.STRING_TYPE,
+                                        VarCharType.STRING_TYPE,
+                                        VarCharType.STRING_TYPE,
+                                        VarCharType.STRING_TYPE)
+                                .getFields(),
+                        emptyList(),
+                        emptyList(),
+                        options,
+                        "");
+        schemaManager.createTable(schema);
+        // schema-1
+        schemaManager.commitChanges(
+                SchemaChange.setOption(CoreOptions.CLUSTERING_COLUMNS.key(), 
"f2,f3"));
+    }
+
+    private static DataFileMeta createFile(long size, long schemaId, int 
level) {
+        return DataFileMeta.create(
+                "",
+                size,
+                1,
+                null,
+                null,
+                null,
+                null,
+                0,
+                0,
+                schemaId,
+                level,
+                null,
+                null,
+                FileSource.APPEND,
+                null,
+                null,
+                null);
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
index 0effc8807f..43a3e68971 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
@@ -101,6 +101,10 @@ public class CompactAction extends TableActionBase {
         checkArgument(
                 !((FileStoreTable) table).coreOptions().dataEvolutionEnabled(),
                 "Compact action does not support data evolution table yet. ");
+        checkArgument(
+                !(((FileStoreTable) table).bucketMode() == 
BucketMode.BUCKET_UNAWARE
+                        && ((FileStoreTable) 
table).coreOptions().clusteringIncrementalEnabled()),
+                "The table has enabled incremental clustering, and do not 
support compact in flink yet.");
         HashMap<String, String> dynamicOptions = new HashMap<>(tableConf);
         dynamicOptions.put(CoreOptions.WRITE_ONLY.key(), "false");
         table = table.copy(dynamicOptions);
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendTableSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendTableSink.java
index b95318df79..ccad6b35ca 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendTableSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendTableSink.java
@@ -23,6 +23,7 @@ import 
org.apache.paimon.flink.compact.AppendPreCommitCompactCoordinatorOperator
 import org.apache.paimon.flink.compact.AppendPreCommitCompactWorkerOperator;
 import org.apache.paimon.flink.source.AppendBypassCoordinateOperatorFactory;
 import org.apache.paimon.options.Options;
+import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.table.FileStoreTable;
 
 import org.apache.flink.api.common.RuntimeExecutionMode;
@@ -98,7 +99,10 @@ public abstract class AppendTableSink<T> extends 
FlinkWriteSink<T> {
         }
 
         boolean enableCompaction =
-                !table.coreOptions().writeOnly() && 
!table.coreOptions().dataEvolutionEnabled();
+                !table.coreOptions().writeOnly()
+                        && !table.coreOptions().dataEvolutionEnabled()
+                        && !(table.bucketMode() == BucketMode.BUCKET_UNAWARE
+                                && 
table.coreOptions().clusteringIncrementalEnabled());
         boolean isStreamingMode =
                 input.getExecutionEnvironment()
                                 .getConfiguration()
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
index 3a33fe008b..299fb6f6d7 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
@@ -23,8 +23,13 @@ import org.apache.paimon.CoreOptions.OrderType;
 import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.append.AppendCompactCoordinator;
 import org.apache.paimon.append.AppendCompactTask;
+import org.apache.paimon.append.cluster.IncrementalClusterManager;
+import org.apache.paimon.compact.CompactUnit;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.io.CompactIncrement;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.DataIncrement;
 import org.apache.paimon.manifest.PartitionEntry;
 import org.apache.paimon.operation.BaseAppendFileStoreWrite;
 import org.apache.paimon.partition.PartitionPredicate;
@@ -42,6 +47,7 @@ import org.apache.paimon.table.sink.BatchTableCommit;
 import org.apache.paimon.table.sink.BatchTableWrite;
 import org.apache.paimon.table.sink.BatchWriteBuilder;
 import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.CommitMessageImpl;
 import org.apache.paimon.table.sink.CommitMessageSerializer;
 import org.apache.paimon.table.sink.TableCommitImpl;
 import org.apache.paimon.table.source.DataSplit;
@@ -92,6 +98,9 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
+
 import static org.apache.paimon.CoreOptions.createCommitUser;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 import static org.apache.spark.sql.types.DataTypes.StringType;
@@ -146,8 +155,7 @@ public class CompactProcedure extends BaseProcedure {
     public InternalRow[] call(InternalRow args) {
         Identifier tableIdent = toIdentifier(args.getString(0), 
PARAMETERS[0].name());
         String partitions = blank(args, 1) ? null : args.getString(1);
-        // make full compact strategy as default.
-        String compactStrategy = blank(args, 2) ? FULL : args.getString(2);
+        String compactStrategy = blank(args, 2) ? null : args.getString(2);
         String sortType = blank(args, 3) ? OrderType.NONE.name() : 
args.getString(3);
         List<String> sortColumns =
                 blank(args, 4)
@@ -166,7 +174,9 @@ public class CompactProcedure extends BaseProcedure {
                     "sort compact do not support 'partition_idle_time'.");
         }
 
-        if (!(compactStrategy.equalsIgnoreCase(FULL) || 
compactStrategy.equalsIgnoreCase(MINOR))) {
+        if (!(compactStrategy == null
+                || compactStrategy.equalsIgnoreCase(FULL)
+                || compactStrategy.equalsIgnoreCase(MINOR))) {
             throw new IllegalArgumentException(
                     String.format(
                             "The compact strategy only supports 'full' or 
'minor', but '%s' is configured.",
@@ -205,6 +215,12 @@ public class CompactProcedure extends BaseProcedure {
                             dynamicOptions, CoreOptions.WRITE_ONLY.key(), 
"false");
                     ProcedureUtils.putAllOptions(dynamicOptions, options);
                     table = table.copy(dynamicOptions);
+                    if (((FileStoreTable) 
table).coreOptions().clusteringIncrementalEnabled()
+                            && (!OrderType.NONE.name().equals(sortType))) {
+                        throw new IllegalArgumentException(
+                                "The table has enabled incremental clustering, 
do not support sort compact.");
+                    }
+
                     InternalRow internalRow =
                             newInternalRow(
                                     execute(
@@ -238,6 +254,13 @@ public class CompactProcedure extends BaseProcedure {
             @Nullable Duration partitionIdleTime) {
         BucketMode bucketMode = table.bucketMode();
         OrderType orderType = OrderType.of(sortType);
+
+        boolean clusterIncrementalEnabled = 
table.coreOptions().clusteringIncrementalEnabled();
+        if (compactStrategy == null) {
+            // make full compact strategy as default for compact.
+            // make non-full compact strategy as default for incremental 
clustering.
+            compactStrategy = clusterIncrementalEnabled ? MINOR : FULL;
+        }
         boolean fullCompact = compactStrategy.equalsIgnoreCase(FULL);
         RowType partitionType = table.schema().logicalPartitionType();
         Predicate filter =
@@ -251,6 +274,7 @@ public class CompactProcedure extends BaseProcedure {
                                 .getOrElse(null);
         PartitionPredicate partitionPredicate =
                 PartitionPredicate.fromPredicate(partitionType, filter);
+
         if (orderType.equals(OrderType.NONE)) {
             JavaSparkContext javaSparkContext = new 
JavaSparkContext(spark().sparkContext());
             switch (bucketMode) {
@@ -264,8 +288,12 @@ public class CompactProcedure extends BaseProcedure {
                             javaSparkContext);
                     break;
                 case BUCKET_UNAWARE:
-                    compactUnAwareBucketTable(
-                            table, partitionPredicate, partitionIdleTime, 
javaSparkContext);
+                    if (clusterIncrementalEnabled) {
+                        clusterIncrementalUnAwareBucketTable(table, 
fullCompact, relation);
+                    } else {
+                        compactUnAwareBucketTable(
+                                table, partitionPredicate, partitionIdleTime, 
javaSparkContext);
+                    }
                     break;
                 default:
                     throw new UnsupportedOperationException(
@@ -521,6 +549,99 @@ public class CompactProcedure extends BaseProcedure {
         }
     }
 
+    private void clusterIncrementalUnAwareBucketTable(
+            FileStoreTable table, boolean fullCompaction, DataSourceV2Relation 
relation) {
+        IncrementalClusterManager incrementalClusterManager = new 
IncrementalClusterManager(table);
+        Map<BinaryRow, CompactUnit> compactUnits =
+                incrementalClusterManager.prepareForCluster(fullCompaction);
+
+        // generate splits for each partition
+        Map<BinaryRow, DataSplit[]> partitionSplits =
+                compactUnits.entrySet().stream()
+                        .collect(
+                                Collectors.toMap(
+                                        Map.Entry::getKey,
+                                        entry ->
+                                                incrementalClusterManager
+                                                        .toSplits(
+                                                                entry.getKey(),
+                                                                
entry.getValue().files())
+                                                        .toArray(new 
DataSplit[0])));
+
+        // sort in partition
+        TableSorter sorter =
+                TableSorter.getSorter(
+                        table,
+                        incrementalClusterManager.clusterCurve(),
+                        incrementalClusterManager.clusterKeys());
+        LOG.info(
+                "Start to sort in partition, cluster curve is {}, cluster keys 
is {}",
+                incrementalClusterManager.clusterCurve(),
+                incrementalClusterManager.clusterKeys());
+
+        Dataset<Row> datasetForWrite =
+                partitionSplits.values().stream()
+                        .map(
+                                split -> {
+                                    Dataset<Row> dataset =
+                                            PaimonUtils.createDataset(
+                                                    spark(),
+                                                    
ScanPlanHelper$.MODULE$.createNewScanPlan(
+                                                            split, relation));
+                                    return sorter.sort(dataset);
+                                })
+                        .reduce(Dataset::union)
+                        .orElse(null);
+        if (datasetForWrite != null) {
+            // set to write only to prevent invoking compaction
+            // do not use overwrite, we don't need to overwrite the whole 
partition
+            PaimonSparkWriter writer = 
PaimonSparkWriter.apply(table).writeOnly();
+            Seq<CommitMessage> commitMessages = writer.write(datasetForWrite);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Commit messages after writing:{}", commitMessages);
+            }
+
+            // re-organize the commit messages to generate the compact messages
+            Map<BinaryRow, List<DataFileMeta>> partitionClustered = new 
HashMap<>();
+            for (CommitMessage commitMessage : 
JavaConverters.seqAsJavaList(commitMessages)) {
+                checkArgument(commitMessage.bucket() == 0);
+                partitionClustered
+                        .computeIfAbsent(commitMessage.partition(), k -> new 
ArrayList<>())
+                        .addAll(((CommitMessageImpl) 
commitMessage).newFilesIncrement().newFiles());
+            }
+
+            List<CommitMessage> clusterMessages = new ArrayList<>();
+            for (Map.Entry<BinaryRow, List<DataFileMeta>> entry : 
partitionClustered.entrySet()) {
+                BinaryRow partition = entry.getKey();
+                List<DataFileMeta> clusterBefore = 
compactUnits.get(partition).files();
+                // upgrade the clustered file to outputLevel
+                List<DataFileMeta> clusterAfter =
+                        incrementalClusterManager.upgrade(
+                                entry.getValue(), 
compactUnits.get(partition).outputLevel());
+                LOG.info(
+                        "Partition {}: upgrade file level to {}",
+                        partition,
+                        compactUnits.get(partition).outputLevel());
+                CompactIncrement compactIncrement =
+                        new CompactIncrement(clusterBefore, clusterAfter, 
Collections.emptyList());
+                clusterMessages.add(
+                        new CommitMessageImpl(
+                                partition,
+                                // bucket 0 is bucket for unaware-bucket table
+                                // for compatibility with the old design
+                                0,
+                                table.coreOptions().bucket(),
+                                DataIncrement.emptyIncrement(),
+                                compactIncrement));
+            }
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Commit messages after reorganizing:{}", 
clusterMessages);
+            }
+
+            
writer.commit(JavaConverters.asScalaBuffer(clusterMessages).toSeq());
+        }
+    }
+
     private Map<BinaryRow, DataSplit[]> packForSort(List<DataSplit> 
dataSplits) {
         // Make a single partition as a compact group
         return dataSplits.stream()
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
index 2f6b743f5c..246245c052 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
@@ -258,7 +258,7 @@ case class PaimonSparkWriter(table: FileStoreTable, 
writeRowTracking: Boolean =
           }
         }
         val clusteringColumns = coreOptions.clusteringColumns()
-        if (!clusteringColumns.isEmpty) {
+        if ((!coreOptions.clusteringIncrementalEnabled()) && 
(!clusteringColumns.isEmpty)) {
           val strategy = 
coreOptions.clusteringStrategy(tableSchema.fields().size())
           val sorter = TableSorter.getSorter(table, strategy, 
clusteringColumns)
           input = sorter.sort(data)
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala
index 8d1b35cc12..96f85ba757 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala
@@ -24,7 +24,7 @@ import org.apache.paimon.spark.PaimonSparkTestBase
 import org.apache.paimon.table.FileStoreTable
 import org.apache.paimon.table.source.DataSplit
 
-import org.apache.spark.scheduler.{SparkListener, SparkListenerJobEnd, 
SparkListenerStageCompleted, SparkListenerStageSubmitted}
+import org.apache.spark.scheduler.{SparkListener, SparkListenerStageSubmitted}
 import org.apache.spark.sql.{Dataset, Row}
 import org.apache.spark.sql.execution.streaming.MemoryStream
 import org.apache.spark.sql.streaming.StreamTest
@@ -33,6 +33,7 @@ import org.assertj.core.api.Assertions
 import java.util
 
 import scala.collection.JavaConverters._
+import scala.util.Random
 
 /** Test compact procedure. See [[CompactProcedure]]. */
 abstract class CompactProcedureTestBase extends PaimonSparkTestBase with 
StreamTest {
@@ -795,6 +796,340 @@ abstract class CompactProcedureTestBase extends 
PaimonSparkTestBase with StreamT
     }
   }
 
+  test("Paimon Procedure: cluster for unpartitioned table") {
+    failAfter(streamingTimeout) {
+      withTempDir {
+        checkpointDir =>
+          spark.sql(
+            s"""
+               |CREATE TABLE T (a INT, b INT, c STRING)
+               |TBLPROPERTIES ('bucket'='-1','num-levels'='6', 
'num-sorted-run.compaction-trigger'='2', 'clustering.columns'='a,b', 
'clustering.strategy'='zorder', 'clustering.incremental' = 'true')
+               |""".stripMargin)
+          val location = loadTable("T").location().toString
+
+          val inputData = MemoryStream[(Int, Int, String)]
+          val stream = inputData
+            .toDS()
+            .toDF("a", "b", "c")
+            .writeStream
+            .option("checkpointLocation", checkpointDir.getCanonicalPath)
+            .foreachBatch {
+              (batch: Dataset[Row], _: Long) =>
+                batch.write.format("paimon").mode("append").save(location)
+            }
+            .start()
+
+          val query = () => spark.sql("SELECT * FROM T")
+
+          try {
+            val random = new Random()
+            val randomStr = random.nextString(40)
+            // first write
+            inputData.addData((0, 0, randomStr))
+            inputData.addData((0, 1, randomStr))
+            inputData.addData((0, 2, randomStr))
+            inputData.addData((1, 0, randomStr))
+            inputData.addData((1, 1, randomStr))
+            inputData.addData((1, 2, randomStr))
+            inputData.addData((2, 0, randomStr))
+            inputData.addData((2, 1, randomStr))
+            inputData.addData((2, 2, randomStr))
+            stream.processAllAvailable()
+
+            val result = new util.ArrayList[Row]()
+            for (a <- 0 until 3) {
+              for (b <- 0 until 3) {
+                result.add(Row(a, b, randomStr))
+              }
+            }
+            
Assertions.assertThat(query().collect()).containsExactlyElementsOf(result)
+
+            // first cluster, the outputLevel should be 5
+            checkAnswer(spark.sql("CALL paimon.sys.compact(table => 'T')"), 
Row(true) :: Nil)
+
+            // first cluster result
+            val result2 = new util.ArrayList[Row]()
+            result2.add(0, Row(0, 0, randomStr))
+            result2.add(1, Row(0, 1, randomStr))
+            result2.add(2, Row(1, 0, randomStr))
+            result2.add(3, Row(1, 1, randomStr))
+            result2.add(4, Row(0, 2, randomStr))
+            result2.add(5, Row(1, 2, randomStr))
+            result2.add(6, Row(2, 0, randomStr))
+            result2.add(7, Row(2, 1, randomStr))
+            result2.add(8, Row(2, 2, randomStr))
+
+            
Assertions.assertThat(query().collect()).containsExactlyElementsOf(result2)
+
+            var clusteredTable = loadTable("T")
+            checkSnapshot(clusteredTable)
+            var dataSplits = 
clusteredTable.newSnapshotReader().read().dataSplits()
+            Assertions.assertThat(dataSplits.size()).isEqualTo(1)
+            
Assertions.assertThat(dataSplits.get(0).dataFiles().size()).isEqualTo(1)
+            
Assertions.assertThat(dataSplits.get(0).dataFiles().get(0).level()).isEqualTo(5)
+
+            // second write
+            inputData.addData((0, 3, null), (1, 3, null), (2, 3, null))
+            inputData.addData((3, 0, null), (3, 1, null), (3, 2, null), (3, 3, 
null))
+            stream.processAllAvailable()
+
+            val result3 = new util.ArrayList[Row]()
+            result3.addAll(result2)
+            for (a <- 0 until 3) {
+              result3.add(Row(a, 3, null))
+            }
+            for (b <- 0 until 4) {
+              result3.add(Row(3, b, null))
+            }
+
+            
Assertions.assertThat(query().collect()).containsExactlyElementsOf(result3)
+
+            // second cluster, the outputLevel should be 4
+            checkAnswer(spark.sql("CALL paimon.sys.compact(table => 'T')"), 
Row(true) :: Nil)
+            // second cluster result, level-5 and level-4 are individually 
ordered
+            val result4 = new util.ArrayList[Row]()
+            result4.addAll(result2)
+            result4.add(Row(0, 3, null))
+            result4.add(Row(1, 3, null))
+            result4.add(Row(3, 0, null))
+            result4.add(Row(3, 1, null))
+            result4.add(Row(2, 3, null))
+            result4.add(Row(3, 2, null))
+            result4.add(Row(3, 3, null))
+            
Assertions.assertThat(query().collect()).containsExactlyElementsOf(result4)
+
+            clusteredTable = loadTable("T")
+            checkSnapshot(clusteredTable)
+            dataSplits = clusteredTable.newSnapshotReader().read().dataSplits()
+            Assertions.assertThat(dataSplits.size()).isEqualTo(1)
+            
Assertions.assertThat(dataSplits.get(0).dataFiles().size()).isEqualTo(2)
+            
Assertions.assertThat(dataSplits.get(0).dataFiles().get(0).level()).isEqualTo(5)
+            
Assertions.assertThat(dataSplits.get(0).dataFiles().get(1).level()).isEqualTo(4)
+
+            // full cluster
+            checkAnswer(
+              spark.sql("CALL paimon.sys.compact(table => 'T', 
compact_strategy => 'full')"),
+              Row(true) :: Nil)
+            val result5 = new util.ArrayList[Row]()
+            result5.add(Row(0, 0, randomStr))
+            result5.add(Row(0, 1, randomStr))
+            result5.add(Row(1, 0, randomStr))
+            result5.add(Row(1, 1, randomStr))
+            result5.add(Row(0, 2, randomStr))
+            result5.add(Row(0, 3, null))
+            result5.add(Row(1, 2, randomStr))
+            result5.add(Row(1, 3, null))
+            result5.add(Row(2, 0, randomStr))
+            result5.add(Row(2, 1, randomStr))
+            result5.add(Row(3, 0, null))
+            result5.add(Row(3, 1, null))
+            result5.add(Row(2, 2, randomStr))
+            result5.add(Row(2, 3, null))
+            result5.add(Row(3, 2, null))
+            result5.add(Row(3, 3, null))
+            
Assertions.assertThat(query().collect()).containsExactlyElementsOf(result5)
+
+            clusteredTable = loadTable("T")
+            checkSnapshot(clusteredTable)
+            dataSplits = clusteredTable.newSnapshotReader().read().dataSplits()
+            Assertions.assertThat(dataSplits.size()).isEqualTo(1)
+            
Assertions.assertThat(dataSplits.get(0).dataFiles().size()).isEqualTo(1)
+            
Assertions.assertThat(dataSplits.get(0).dataFiles().get(0).level()).isEqualTo(5)
+
+          } finally {
+            stream.stop()
+          }
+      }
+    }
+  }
+
+  test("Paimon Procedure: cluster for partitioned table") {
+    failAfter(streamingTimeout) {
+      withTempDir {
+        checkpointDir =>
+          spark.sql(
+            s"""
+               |CREATE TABLE T (a INT, b INT, c STRING, pt INT)
+               |PARTITIONED BY (pt)
+               |TBLPROPERTIES ('bucket'='-1', 'num-levels'='6', 
'num-sorted-run.compaction-trigger'='2', 'clustering.columns'='a,b', 
'clustering.strategy'='zorder', 'clustering.incremental' = 'true')
+               |""".stripMargin)
+          val location = loadTable("T").location().toString
+
+          val inputData = MemoryStream[(Int, Int, String, Int)]
+          val stream = inputData
+            .toDS()
+            .toDF("a", "b", "c", "pt")
+            .writeStream
+            .option("checkpointLocation", checkpointDir.getCanonicalPath)
+            .foreachBatch {
+              (batch: Dataset[Row], _: Long) =>
+                batch.write.format("paimon").mode("append").save(location)
+            }
+            .start()
+
+          val query = () => spark.sql("SELECT * FROM T ORDER BY pt")
+
+          try {
+            val random = new Random()
+            val randomStr = random.nextString(50)
+            // first write
+            for (pt <- 0 until 2) {
+              val c = if (pt == 0) randomStr else null
+              inputData.addData((0, 0, c, pt))
+              inputData.addData((0, 1, c, pt))
+              inputData.addData((0, 2, c, pt))
+              inputData.addData((1, 0, c, pt))
+              inputData.addData((1, 1, c, pt))
+              inputData.addData((1, 2, c, pt))
+              inputData.addData((2, 0, c, pt))
+              inputData.addData((2, 1, c, pt))
+              inputData.addData((2, 2, c, pt))
+            }
+            stream.processAllAvailable()
+
+            val result = new util.ArrayList[Row]()
+            for (pt <- 0 until 2) {
+              for (a <- 0 until 3) {
+                for (b <- 0 until 3) {
+                  val c = if (pt == 0) randomStr else null
+                  result.add(Row(a, b, c, pt))
+                }
+              }
+            }
+            
Assertions.assertThat(query().collect()).containsExactlyElementsOf(result)
+
+            // first cluster, the outputLevel should be 5
+            checkAnswer(spark.sql("CALL paimon.sys.compact(table => 'T')"), 
Row(true) :: Nil)
+
+            // first cluster result
+            val result2 = new util.ArrayList[Row]()
+            for (pt <- 0 until 2) {
+              val c = if (pt == 0) randomStr else null
+              result2.add(Row(0, 0, c, pt))
+              result2.add(Row(0, 1, c, pt))
+              result2.add(Row(1, 0, c, pt))
+              result2.add(Row(1, 1, c, pt))
+              result2.add(Row(0, 2, c, pt))
+              result2.add(Row(1, 2, c, pt))
+              result2.add(Row(2, 0, c, pt))
+              result2.add(Row(2, 1, c, pt))
+              result2.add(Row(2, 2, c, pt))
+            }
+
+            
Assertions.assertThat(query().collect()).containsExactlyElementsOf(result2)
+
+            var clusteredTable = loadTable("T")
+            checkSnapshot(clusteredTable)
+            var dataSplits = 
clusteredTable.newSnapshotReader().read().dataSplits()
+            Assertions.assertThat(dataSplits.size()).isEqualTo(2)
+            dataSplits.forEach(
+              dataSplit => {
+                
Assertions.assertThat(dataSplit.dataFiles().size()).isEqualTo(1)
+                
Assertions.assertThat(dataSplit.dataFiles().get(0).level()).isEqualTo(5)
+              })
+
+            // second write
+            for (pt <- 0 until 2) {
+              inputData.addData((0, 3, null, pt), (1, 3, null, pt), (2, 3, 
null, pt))
+              inputData.addData(
+                (3, 0, null, pt),
+                (3, 1, null, pt),
+                (3, 2, null, pt),
+                (3, 3, null, pt))
+            }
+            stream.processAllAvailable()
+
+            val result3 = new util.ArrayList[Row]()
+            for (pt <- 0 until 2) {
+              val c = if (pt == 0) randomStr else null
+              result3.add(Row(0, 0, c, pt))
+              result3.add(Row(0, 1, c, pt))
+              result3.add(Row(1, 0, c, pt))
+              result3.add(Row(1, 1, c, pt))
+              result3.add(Row(0, 2, c, pt))
+              result3.add(Row(1, 2, c, pt))
+              result3.add(Row(2, 0, c, pt))
+              result3.add(Row(2, 1, c, pt))
+              result3.add(Row(2, 2, c, pt))
+              for (a <- 0 until 3) {
+                result3.add(Row(a, 3, null, pt))
+              }
+              for (b <- 0 until 4) {
+                result3.add(Row(3, b, null, pt))
+              }
+            }
+            
Assertions.assertThat(query().collect()).containsExactlyElementsOf(result3)
+
+            // second cluster
+            checkAnswer(spark.sql("CALL paimon.sys.compact(table => 'T')"), 
Row(true) :: Nil)
+            val result4 = new util.ArrayList[Row]()
+            // for partition-0: only file in level-0 will be picked for 
clustering, outputLevel is 4
+            result4.add(Row(0, 0, randomStr, 0))
+            result4.add(Row(0, 1, randomStr, 0))
+            result4.add(Row(1, 0, randomStr, 0))
+            result4.add(Row(1, 1, randomStr, 0))
+            result4.add(Row(0, 2, randomStr, 0))
+            result4.add(Row(1, 2, randomStr, 0))
+            result4.add(Row(2, 0, randomStr, 0))
+            result4.add(Row(2, 1, randomStr, 0))
+            result4.add(Row(2, 2, randomStr, 0))
+            result4.add(Row(0, 3, null, 0))
+            result4.add(Row(1, 3, null, 0))
+            result4.add(Row(3, 0, null, 0))
+            result4.add(Row(3, 1, null, 0))
+            result4.add(Row(2, 3, null, 0))
+            result4.add(Row(3, 2, null, 0))
+            result4.add(Row(3, 3, null, 0))
+            // for partition-1:all files will be picked for clustering, 
outputLevel is 5
+            result4.add(Row(0, 0, null, 1))
+            result4.add(Row(0, 1, null, 1))
+            result4.add(Row(1, 0, null, 1))
+            result4.add(Row(1, 1, null, 1))
+            result4.add(Row(0, 2, null, 1))
+            result4.add(Row(0, 3, null, 1))
+            result4.add(Row(1, 2, null, 1))
+            result4.add(Row(1, 3, null, 1))
+            result4.add(Row(2, 0, null, 1))
+            result4.add(Row(2, 1, null, 1))
+            result4.add(Row(3, 0, null, 1))
+            result4.add(Row(3, 1, null, 1))
+            result4.add(Row(2, 2, null, 1))
+            result4.add(Row(2, 3, null, 1))
+            result4.add(Row(3, 2, null, 1))
+            result4.add(Row(3, 3, null, 1))
+
+            
Assertions.assertThat(query().collect()).containsExactlyElementsOf(result4)
+
+            clusteredTable = loadTable("T")
+            checkSnapshot(clusteredTable)
+            dataSplits = clusteredTable.newSnapshotReader().read().dataSplits()
+            Assertions.assertThat(dataSplits.size()).isEqualTo(2)
+            dataSplits.forEach(
+              dataSplit => {
+                if (dataSplit.partition().getInt(0) == 1) {
+                  // partition-1
+                  
Assertions.assertThat(dataSplit.dataFiles().size()).isEqualTo(1)
+                  
Assertions.assertThat(dataSplit.dataFiles().get(0).level()).isEqualTo(5)
+                } else {
+                  // partition-0
+                  
Assertions.assertThat(dataSplit.dataFiles().size()).isEqualTo(2)
+                  
Assertions.assertThat(dataSplit.dataFiles().get(0).level()).isEqualTo(5)
+                  
Assertions.assertThat(dataSplit.dataFiles().get(1).level()).isEqualTo(4)
+                }
+              })
+          } finally {
+            stream.stop()
+          }
+      }
+    }
+  }
+
+  def checkSnapshot(table: FileStoreTable): Unit = {
+    Assertions
+      .assertThat(table.latestSnapshot().get().commitKind().toString)
+      .isEqualTo(CommitKind.COMPACT.toString)
+  }
+
   def lastSnapshotCommand(table: FileStoreTable): CommitKind = {
     table.snapshotManager().latestSnapshot().commitKind()
   }

Reply via email to