This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new c20198b219 [core] Support incremental clustering for append bucketed 
table (#6961)
c20198b219 is described below

commit c20198b219f3c0d20cd5cea1681454c108701d37
Author: LsomeYeah <[email protected]>
AuthorDate: Thu Jan 8 13:43:42 2026 +0800

    [core] Support incremental clustering for append bucketed table (#6961)
---
 .../main/java/org/apache/paimon/CoreOptions.java   |   6 +
 .../org/apache/paimon/AppendOnlyFileStore.java     |   3 +-
 .../cluster/BucketedAppendClusterManager.java      | 219 +++++++++++++++++++++
 .../append/cluster/BucketedAppendLevels.java       | 137 +++++++++++++
 .../apache/paimon/append/cluster/HibertSorter.java |  80 ++++++++
 .../apache/paimon/append/cluster/OrderSorter.java  |  86 ++++++++
 .../org/apache/paimon/append/cluster/Sorter.java   | 157 +++++++++++++++
 .../apache/paimon/append/cluster/ZorderSorter.java |  80 ++++++++
 .../paimon/operation/BaseAppendFileStoreWrite.java |  38 ++++
 .../operation/BucketedAppendFileStoreWrite.java    |  18 +-
 .../org/apache/paimon/schema/SchemaValidation.java |  13 +-
 .../cluster/BucketedAppendClusterManagerTest.java  | 154 +++++++++++++++
 .../cluster/IncrementalClusterManagerTest.java     |  12 --
 .../apache/paimon/append/cluster/SorterTest.java   | 139 +++++++++++++
 .../apache/paimon/flink/action/CompactAction.java  |   6 +-
 .../action/IncrementalClusterActionITCase.java     | 165 ++++++++++++++++
 16 files changed, 1294 insertions(+), 19 deletions(-)

diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index 78270d7523..ade2ac9c37 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -3193,6 +3193,12 @@ public class CoreOptions implements Serializable {
         return options.get(CLUSTERING_INCREMENTAL);
     }
 
+    public boolean bucketClusterEnabled() {
+        return !bucketAppendOrdered()
+                && !deletionVectorsEnabled()
+                && clusteringIncrementalEnabled();
+    }
+
     public Duration clusteringHistoryPartitionIdleTime() {
         return options.get(CLUSTERING_HISTORY_PARTITION_IDLE_TIME);
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
index f3f0859d8c..51cfe58b42 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
@@ -141,7 +141,8 @@ public class AppendOnlyFileStore extends 
AbstractFileStore<InternalRow> {
                     newScan(),
                     options,
                     dvMaintainerFactory,
-                    tableName);
+                    tableName,
+                    schemaManager);
         }
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/cluster/BucketedAppendClusterManager.java
 
b/paimon-core/src/main/java/org/apache/paimon/append/cluster/BucketedAppendClusterManager.java
new file mode 100644
index 0000000000..154d10c9d3
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/append/cluster/BucketedAppendClusterManager.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.append.cluster;
+
+import org.apache.paimon.AppendOnlyFileStore;
+import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.compact.CompactFutureManager;
+import org.apache.paimon.compact.CompactResult;
+import org.apache.paimon.compact.CompactTask;
+import org.apache.paimon.compact.CompactUnit;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.mergetree.LevelSortedRun;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.utils.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
+
+/** Cluster manager for {@link AppendOnlyFileStore}. */
+public class BucketedAppendClusterManager extends CompactFutureManager {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(BucketedAppendClusterManager.class);
+
+    private final ExecutorService executor;
+    private final BucketedAppendLevels levels;
+    private final IncrementalClusterStrategy strategy;
+    private final CompactRewriter rewriter;
+
+    public BucketedAppendClusterManager(
+            ExecutorService executor,
+            List<DataFileMeta> restored,
+            SchemaManager schemaManager,
+            List<String> clusterKeys,
+            int maxSizeAmp,
+            int sizeRatio,
+            int numRunCompactionTrigger,
+            int numLevels,
+            CompactRewriter rewriter) {
+        this.executor = executor;
+        this.levels = new BucketedAppendLevels(restored, numLevels);
+        this.strategy =
+                new IncrementalClusterStrategy(
+                        schemaManager, clusterKeys, maxSizeAmp, sizeRatio, 
numRunCompactionTrigger);
+        this.rewriter = rewriter;
+    }
+
+    @Override
+    public boolean shouldWaitForLatestCompaction() {
+        return false;
+    }
+
+    @Override
+    public boolean shouldWaitForPreparingCheckpoint() {
+        return false;
+    }
+
+    @Override
+    public void addNewFile(DataFileMeta file) {
+        levels.addLevel0File(file);
+    }
+
+    @Override
+    public List<DataFileMeta> allFiles() {
+        return levels.allFiles();
+    }
+
+    @Override
+    public void triggerCompaction(boolean fullCompaction) {
+        Optional<CompactUnit> optionalUnit;
+        List<LevelSortedRun> runs = levels.levelSortedRuns();
+        if (fullCompaction) {
+            Preconditions.checkState(
+                    taskFuture == null,
+                    "A compaction task is still running while the user "
+                            + "forces a new compaction. This is unexpected.");
+            if (LOG.isDebugEnabled()) {
+                LOG.debug(
+                        "Trigger forced full compaction. Picking from the 
following runs\n{}",
+                        runs);
+            }
+            optionalUnit = strategy.pick(levels.numberOfLevels(), runs, true);
+        } else {
+            if (taskFuture != null) {
+                return;
+            }
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Trigger normal compaction. Picking from the 
following runs\n{}", runs);
+            }
+            optionalUnit =
+                    strategy.pick(levels.numberOfLevels(), runs, false)
+                            .filter(unit -> !unit.files().isEmpty());
+        }
+
+        optionalUnit.ifPresent(
+                unit -> {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug(
+                                "Submit compaction with files (name, level, 
size): "
+                                        + levels.levelSortedRuns().stream()
+                                                .flatMap(lsr -> 
lsr.run().files().stream())
+                                                .map(
+                                                        file ->
+                                                                String.format(
+                                                                        "(%s, 
%d, %d)",
+                                                                        
file.fileName(),
+                                                                        
file.level(),
+                                                                        
file.fileSize()))
+                                                .collect(Collectors.joining(", 
")));
+                    }
+                    submitCompaction(unit);
+                });
+    }
+
+    private void submitCompaction(CompactUnit unit) {
+
+        BucketedAppendClusterTask task =
+                new BucketedAppendClusterTask(unit.files(), 
unit.outputLevel(), rewriter);
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug(
+                    "Pick these files (name, level, size) for {} compaction: 
{}",
+                    task.getClass().getSimpleName(),
+                    unit.files().stream()
+                            .map(
+                                    file ->
+                                            String.format(
+                                                    "(%s, %d, %d)",
+                                                    file.fileName(), 
file.level(), file.fileSize()))
+                            .collect(Collectors.joining(", ")));
+        }
+        taskFuture = executor.submit(task);
+    }
+
+    @Override
+    public Optional<CompactResult> getCompactionResult(boolean blocking)
+            throws ExecutionException, InterruptedException {
+        Optional<CompactResult> result = innerGetCompactionResult(blocking);
+        result.ifPresent(
+                r -> {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug(
+                                "Update levels in compact manager with these 
changes:\nBefore:\n{}\nAfter:\n{}",
+                                r.before(),
+                                r.after());
+                    }
+                    levels.update(r.before(), r.after());
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug(
+                                "Levels in compact manager updated. Current 
runs are\n{}",
+                                levels.levelSortedRuns());
+                    }
+                });
+        return result;
+    }
+
+    @Override
+    public void close() throws IOException {}
+
+    @VisibleForTesting
+    public BucketedAppendLevels levels() {
+        return levels;
+    }
+
+    /** A {@link CompactTask} impl for clustering of append bucketed table. */
+    public static class BucketedAppendClusterTask extends CompactTask {
+
+        private final List<DataFileMeta> toCluster;
+        private final int outputLevel;
+        private final CompactRewriter rewriter;
+
+        public BucketedAppendClusterTask(
+                List<DataFileMeta> toCluster, int outputLevel, CompactRewriter 
rewriter) {
+            super(null);
+            this.toCluster = toCluster;
+            this.outputLevel = outputLevel;
+            this.rewriter = rewriter;
+        }
+
+        @Override
+        protected CompactResult doCompact() throws Exception {
+            List<DataFileMeta> rewrite = rewriter.rewrite(toCluster);
+            return new CompactResult(toCluster, upgrade(rewrite));
+        }
+
+        protected List<DataFileMeta> upgrade(List<DataFileMeta> files) {
+            return files.stream()
+                    .map(file -> file.upgrade(outputLevel))
+                    .collect(Collectors.toList());
+        }
+    }
+
+    /** Compact rewriter for append-only table. */
+    public interface CompactRewriter {
+        List<DataFileMeta> rewrite(List<DataFileMeta> compactBefore) throws 
Exception;
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/cluster/BucketedAppendLevels.java
 
b/paimon-core/src/main/java/org/apache/paimon/append/cluster/BucketedAppendLevels.java
new file mode 100644
index 0000000000..ba62fd9748
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/append/cluster/BucketedAppendLevels.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.append.cluster;
+
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.mergetree.LevelSortedRun;
+import org.apache.paimon.mergetree.SortedRun;
+import org.apache.paimon.utils.Preconditions;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static java.util.Collections.emptyList;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/** A class which stores all level files in append bucketed table. */
+public class BucketedAppendLevels {
+
+    private final HashSet<DataFileMeta> level0;
+
+    private final List<SortedRun> levels;
+
+    public BucketedAppendLevels(List<DataFileMeta> inputFiles, int numLevels) {
+        int restoredNumLevels =
+                Math.max(
+                        numLevels,
+                        
inputFiles.stream().mapToInt(DataFileMeta::level).max().orElse(-1) + 1);
+        checkArgument(restoredNumLevels > 1, "Number of levels must be at 
least 2.");
+        this.level0 = new HashSet<>();
+        this.levels = new ArrayList<>();
+        for (int i = 1; i < restoredNumLevels; i++) {
+            levels.add(SortedRun.empty());
+        }
+
+        Map<Integer, List<DataFileMeta>> levelMap = new HashMap<>();
+        for (DataFileMeta file : inputFiles) {
+            levelMap.computeIfAbsent(file.level(), level -> new 
ArrayList<>()).add(file);
+        }
+        levelMap.forEach((level, files) -> updateLevel(level, emptyList(), 
files));
+
+        Preconditions.checkState(
+                level0.size() + levels.stream().mapToInt(r -> 
r.files().size()).sum()
+                        == inputFiles.size(),
+                "Number of files stored in Levels does not equal to the size 
of inputFiles. This is unexpected.");
+    }
+
+    public void addLevel0File(DataFileMeta file) {
+        checkArgument(file.level() == 0);
+        level0.add(file);
+    }
+
+    public SortedRun runOfLevel(int level) {
+        checkArgument(level > 0, "Level0 does not have one single sorted 
run.");
+        return levels.get(level - 1);
+    }
+
+    public int numberOfLevels() {
+        return levels.size() + 1;
+    }
+
+    public int maxLevel() {
+        return levels.size();
+    }
+
+    public List<DataFileMeta> allFiles() {
+        List<DataFileMeta> files = new ArrayList<>();
+        List<LevelSortedRun> runs = levelSortedRuns();
+        for (LevelSortedRun run : runs) {
+            files.addAll(run.run().files());
+        }
+        return files;
+    }
+
+    public List<LevelSortedRun> levelSortedRuns() {
+        List<LevelSortedRun> runs = new ArrayList<>();
+        level0.forEach(file -> runs.add(new LevelSortedRun(0, 
SortedRun.fromSingle(file))));
+        for (int i = 0; i < levels.size(); i++) {
+            SortedRun run = levels.get(i);
+            if (run.nonEmpty()) {
+                runs.add(new LevelSortedRun(i + 1, run));
+            }
+        }
+        return runs;
+    }
+
+    public void update(List<DataFileMeta> before, List<DataFileMeta> after) {
+        Map<Integer, List<DataFileMeta>> groupedBefore = groupByLevel(before);
+        Map<Integer, List<DataFileMeta>> groupedAfter = groupByLevel(after);
+        for (int i = 0; i < numberOfLevels(); i++) {
+            updateLevel(
+                    i,
+                    groupedBefore.getOrDefault(i, emptyList()),
+                    groupedAfter.getOrDefault(i, emptyList()));
+        }
+    }
+
+    private void updateLevel(int level, List<DataFileMeta> before, 
List<DataFileMeta> after) {
+        if (before.isEmpty() && after.isEmpty()) {
+            return;
+        }
+
+        if (level == 0) {
+            before.forEach(level0::remove);
+            level0.addAll(after);
+        } else {
+            List<DataFileMeta> files = new 
ArrayList<>(runOfLevel(level).files());
+            files.removeAll(before);
+            files.addAll(after);
+            levels.set(level - 1, SortedRun.fromSorted(files));
+        }
+    }
+
+    private Map<Integer, List<DataFileMeta>> groupByLevel(List<DataFileMeta> 
files) {
+        return files.stream()
+                .collect(Collectors.groupingBy(DataFileMeta::level, 
Collectors.toList()));
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/cluster/HibertSorter.java 
b/paimon-core/src/main/java/org/apache/paimon/append/cluster/HibertSorter.java
new file mode 100644
index 0000000000..fcd59ce0d3
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/append/cluster/HibertSorter.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.append.cluster;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.JoinedRow;
+import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.reader.RecordReaderIterator;
+import org.apache.paimon.sort.hilbert.HilbertIndexer;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+/** Hilbert sorter for clustering. */
+public class HibertSorter extends Sorter {
+
+    private static final RowType KEY_TYPE =
+            new RowType(Collections.singletonList(new DataField(0, "H_INDEX", 
DataTypes.BYTES())));
+
+    private final HibertKeyAbstract hilbertKeyAbstract;
+
+    public HibertSorter(
+            RecordReaderIterator<InternalRow> reader,
+            RowType valueType,
+            CoreOptions options,
+            List<String> orderColNames,
+            IOManager ioManager) {
+        super(reader, KEY_TYPE, valueType, options, ioManager);
+        this.hilbertKeyAbstract = new HibertKeyAbstract(valueType, 
orderColNames);
+        this.hilbertKeyAbstract.open();
+    }
+
+    @Override
+    public InternalRow assignSortKey(InternalRow row) {
+        byte[] key = hilbertKeyAbstract.apply(row);
+        return new JoinedRow(GenericRow.of(key), row);
+    }
+
+    private static class HibertKeyAbstract implements KeyAbstract<byte[]> {
+
+        private final HilbertIndexer hilbertIndexer;
+
+        public HibertKeyAbstract(RowType rowType, List<String> orderColNames) {
+            hilbertIndexer = new HilbertIndexer(rowType, orderColNames);
+        }
+
+        @Override
+        public void open() {
+            hilbertIndexer.open();
+        }
+
+        @Override
+        public byte[] apply(InternalRow value) {
+            byte[] hilbert = hilbertIndexer.index(value);
+            return Arrays.copyOf(hilbert, hilbert.length);
+        }
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/cluster/OrderSorter.java 
b/paimon-core/src/main/java/org/apache/paimon/append/cluster/OrderSorter.java
new file mode 100644
index 0000000000..ee2ace3f3a
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/append/cluster/OrderSorter.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.append.cluster;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.codegen.CodeGenUtils;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.JoinedRow;
+import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.reader.RecordReaderIterator;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.Projection;
+
+import java.util.List;
+
+import static org.apache.paimon.table.PrimaryKeyTableUtils.addKeyNamePrefix;
+
+/** Order sorter for clustering. */
+public class OrderSorter extends Sorter {
+
+    private final OrderKeyAbstract orderKeyAbstract;
+
+    public OrderSorter(
+            RecordReaderIterator<InternalRow> reader,
+            RowType valueType,
+            CoreOptions options,
+            List<String> orderColNames,
+            IOManager ioManager) {
+        super(reader, keyType(valueType, orderColNames), valueType, options, 
ioManager);
+        this.orderKeyAbstract = new OrderKeyAbstract(valueType, orderColNames);
+        this.orderKeyAbstract.open();
+    }
+
+    @Override
+    public InternalRow assignSortKey(InternalRow row) {
+        InternalRow key = orderKeyAbstract.apply(row);
+        return new JoinedRow(key, row);
+    }
+
+    private static RowType keyType(RowType valueType, List<String> 
orderColNames) {
+        List<String> fieldNames = valueType.getFieldNames();
+        int[] keyProjectionMap = 
orderColNames.stream().mapToInt(fieldNames::indexOf).toArray();
+        return 
addKeyNamePrefix(Projection.of(keyProjectionMap).project(valueType));
+    }
+
+    private static class OrderKeyAbstract implements KeyAbstract<InternalRow> {
+
+        private final RowType valueRowType;
+        private final int[] keyProjectionMap;
+
+        private transient org.apache.paimon.codegen.Projection keyProjection;
+
+        public OrderKeyAbstract(RowType rowType, List<String> orderColNames) {
+            this.valueRowType = rowType;
+            List<String> fieldNames = rowType.getFieldNames();
+            this.keyProjectionMap = 
orderColNames.stream().mapToInt(fieldNames::indexOf).toArray();
+        }
+
+        @Override
+        public void open() {
+            // use key gen to speed up projection
+            keyProjection = CodeGenUtils.newProjection(valueRowType, 
keyProjectionMap);
+        }
+
+        @Override
+        public InternalRow apply(InternalRow value) {
+            return keyProjection.apply(value).copy();
+        }
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/cluster/Sorter.java 
b/paimon-core/src/main/java/org/apache/paimon/append/cluster/Sorter.java
new file mode 100644
index 0000000000..60dbecdde0
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/append/cluster/Sorter.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.append.cluster;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.compression.CompressOptions;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.options.MemorySize;
+import org.apache.paimon.reader.RecordReaderIterator;
+import org.apache.paimon.sort.BinaryExternalSortBuffer;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.KeyProjectedRow;
+import org.apache.paimon.utils.MutableObjectIterator;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.IntStream;
+
+/** Sorter for clustering. */
+public abstract class Sorter {
+
+    protected final RecordReaderIterator<InternalRow> reader;
+    protected final RowType keyType;
+    protected final RowType longRowType;
+    protected final int[] valueProjectionMap;
+    private final int arity;
+
+    private final transient IOManager ioManager;
+    private final transient BinaryExternalSortBuffer buffer;
+
+    public Sorter(
+            RecordReaderIterator<InternalRow> reader,
+            RowType keyType,
+            RowType valueType,
+            CoreOptions options,
+            IOManager ioManager) {
+        this.reader = reader;
+        this.keyType = keyType;
+        int keyFieldCount = keyType.getFieldCount();
+        int valueFieldCount = valueType.getFieldCount();
+        this.valueProjectionMap = new int[valueFieldCount];
+        for (int i = 0; i < valueFieldCount; i++) {
+            this.valueProjectionMap[i] = i + keyFieldCount;
+        }
+        List<DataField> keyFields = keyType.getFields();
+        List<DataField> dataFields = valueType.getFields();
+        List<DataField> fields = new ArrayList<>();
+        fields.addAll(keyFields);
+        fields.addAll(dataFields);
+        this.longRowType = new RowType(fields);
+        this.arity = longRowType.getFieldCount();
+
+        long maxMemory = options.writeBufferSize();
+        int pageSize = options.pageSize();
+        int spillSortMaxNumFiles = options.localSortMaxNumFileHandles();
+        CompressOptions spillCompression = options.spillCompressOptions();
+        MemorySize maxDiskSize = options.writeBufferSpillDiskSize();
+        boolean sequenceOrder = options.sequenceFieldSortOrderIsAscending();
+
+        this.ioManager = ioManager;
+        this.buffer =
+                BinaryExternalSortBuffer.create(
+                        ioManager,
+                        longRowType,
+                        IntStream.range(0, keyType.getFieldCount()).toArray(),
+                        maxMemory,
+                        pageSize,
+                        spillSortMaxNumFiles,
+                        spillCompression,
+                        maxDiskSize,
+                        sequenceOrder);
+    }
+
+    public abstract InternalRow assignSortKey(InternalRow row);
+
+    public InternalRow removeSortKey(InternalRow rowWithKey) {
+        KeyProjectedRow keyProjectedRow = new 
KeyProjectedRow(valueProjectionMap);
+        return keyProjectedRow.replaceRow(rowWithKey);
+    }
+
+    public MutableObjectIterator<BinaryRow> sort() throws IOException {
+        while (reader.hasNext()) {
+            InternalRow row = reader.next();
+            InternalRow rowWithKey = assignSortKey(row);
+            buffer.write(rowWithKey);
+        }
+
+        if (buffer.size() > 0) {
+            return buffer.sortedIterator();
+        } else {
+            throw new IllegalStateException("numRecords after sorting is 0.");
+        }
+    }
+
+    public int arity() {
+        return arity;
+    }
+
+    public void close() throws Exception {
+        if (buffer != null) {
+            buffer.clear();
+        }
+        if (ioManager != null) {
+            ioManager.close();
+        }
+    }
+
+    public static Sorter getSorter(
+            RecordReaderIterator<InternalRow> reader,
+            IOManager ioManager,
+            RowType rowType,
+            CoreOptions options) {
+        CoreOptions.OrderType clusterCurve =
+                options.clusteringStrategy(options.clusteringColumns().size());
+        switch (clusterCurve) {
+            case HILBERT:
+                return new HibertSorter(
+                        reader, rowType, options, options.clusteringColumns(), 
ioManager);
+            case ZORDER:
+                return new ZorderSorter(
+                        reader, rowType, options, options.clusteringColumns(), 
ioManager);
+            case ORDER:
+                return new OrderSorter(
+                        reader, rowType, options, options.clusteringColumns(), 
ioManager);
+            default:
+                throw new IllegalArgumentException("cannot match cluster type: 
" + clusterCurve);
+        }
+    }
+
+    /** Abstract key from a row data. */
+    public interface KeyAbstract<KEY> extends Serializable {
+        default void open() {}
+
+        KEY apply(InternalRow value);
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/cluster/ZorderSorter.java 
b/paimon-core/src/main/java/org/apache/paimon/append/cluster/ZorderSorter.java
new file mode 100644
index 0000000000..57e95d06c6
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/append/cluster/ZorderSorter.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.append.cluster;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.JoinedRow;
+import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.reader.RecordReaderIterator;
+import org.apache.paimon.sort.zorder.ZIndexer;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+/** Z-order sorter for clustering. */
+public class ZorderSorter extends Sorter {
+
+    private static final RowType KEY_TYPE =
+            new RowType(Collections.singletonList(new DataField(0, "Z_INDEX", 
DataTypes.BYTES())));
+
+    private final ZorderKeyAbstract zorderKeyAbstract;
+
+    public ZorderSorter(
+            RecordReaderIterator<InternalRow> reader,
+            RowType valueType,
+            CoreOptions options,
+            List<String> orderColNames,
+            IOManager ioManager) {
+        super(reader, KEY_TYPE, valueType, options, ioManager);
+        this.zorderKeyAbstract = new ZorderKeyAbstract(valueType, options, 
orderColNames);
+        this.zorderKeyAbstract.open();
+    }
+
+    @Override
+    public InternalRow assignSortKey(InternalRow row) {
+        byte[] key = zorderKeyAbstract.apply(row);
+        return new JoinedRow(GenericRow.of(key), row);
+    }
+
+    private static class ZorderKeyAbstract implements KeyAbstract<byte[]> {
+
+        private final ZIndexer zIndexer;
+
+        public ZorderKeyAbstract(RowType rowType, CoreOptions options, 
List<String> orderColNames) {
+            zIndexer = new ZIndexer(rowType, orderColNames, 
options.varTypeSize());
+        }
+
+        @Override
+        public void open() {
+            zIndexer.open();
+        }
+
+        @Override
+        public byte[] apply(InternalRow value) {
+            byte[] zorder = zIndexer.index(value);
+            return Arrays.copyOf(zorder, zorder.length);
+        }
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java
index 6dee947308..1c63f41b02 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java
@@ -21,6 +21,7 @@ package org.apache.paimon.operation;
 import org.apache.paimon.AppendOnlyFileStore;
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.append.AppendOnlyWriter;
+import org.apache.paimon.append.cluster.Sorter;
 import org.apache.paimon.compact.CompactManager;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
@@ -41,6 +42,7 @@ import org.apache.paimon.utils.ExceptionUtils;
 import org.apache.paimon.utils.FileStorePathFactory;
 import org.apache.paimon.utils.IOExceptionSupplier;
 import org.apache.paimon.utils.LongCounter;
+import org.apache.paimon.utils.MutableObjectIterator;
 import org.apache.paimon.utils.RecordWriter;
 import org.apache.paimon.utils.SnapshotManager;
 import org.apache.paimon.utils.StatsCollectorFactories;
@@ -205,6 +207,42 @@ public abstract class BaseAppendFileStoreWrite extends 
MemoryFileStoreWrite<Inte
         return rewriter.result();
     }
 
+    public List<DataFileMeta> clusterRewrite(
+            BinaryRow partition, int bucket, List<DataFileMeta> toCluster) 
throws Exception {
+        RecordReaderIterator<InternalRow> reader =
+                createFilesIterator(partition, bucket, toCluster, null);
+
+        // sort and rewrite
+        Exception collectedExceptions = null;
+        Sorter sorter = Sorter.getSorter(reader, ioManager, rowType, options);
+        RowDataRollingFileWriter rewriter =
+                createRollingFileWriter(
+                        partition, bucket, new 
LongCounter(toCluster.get(0).minSequenceNumber()));
+        try {
+            MutableObjectIterator<BinaryRow> sorted = sorter.sort();
+            BinaryRow binaryRow = new BinaryRow(sorter.arity());
+            while ((binaryRow = sorted.next(binaryRow)) != null) {
+                InternalRow rowRemovedKey = sorter.removeSortKey(binaryRow);
+                rewriter.write(rowRemovedKey);
+            }
+        } catch (Exception e) {
+            collectedExceptions = e;
+        } finally {
+            try {
+                rewriter.close();
+                sorter.close();
+            } catch (Exception e) {
+                collectedExceptions = ExceptionUtils.firstOrSuppressed(e, 
collectedExceptions);
+            }
+        }
+
+        if (collectedExceptions != null) {
+            throw collectedExceptions;
+        }
+
+        return rewriter.result();
+    }
+
     private RowDataRollingFileWriter createRollingFileWriter(
             BinaryRow partition, int bucket, LongCounter seqNumCounter) {
         return new RowDataRollingFileWriter(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/BucketedAppendFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/BucketedAppendFileStoreWrite.java
index a3f2083e25..b93c2b5115 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/BucketedAppendFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/BucketedAppendFileStoreWrite.java
@@ -20,6 +20,7 @@ package org.apache.paimon.operation;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.append.BucketedAppendCompactManager;
+import org.apache.paimon.append.cluster.BucketedAppendClusterManager;
 import org.apache.paimon.compact.CompactManager;
 import org.apache.paimon.compact.NoopCompactManager;
 import org.apache.paimon.data.BinaryRow;
@@ -28,6 +29,7 @@ import org.apache.paimon.deletionvectors.BucketedDvMaintainer;
 import org.apache.paimon.deletionvectors.DeletionVector;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.FileStorePathFactory;
 import org.apache.paimon.utils.SnapshotManager;
@@ -42,6 +44,7 @@ import java.util.function.Function;
 public class BucketedAppendFileStoreWrite extends BaseAppendFileStoreWrite {
 
     private final String commitUser;
+    private final SchemaManager schemaManager;
 
     public BucketedAppendFileStoreWrite(
             FileIO fileIO,
@@ -55,7 +58,8 @@ public class BucketedAppendFileStoreWrite extends 
BaseAppendFileStoreWrite {
             FileStoreScan scan,
             CoreOptions options,
             @Nullable BucketedDvMaintainer.Factory dvMaintainerFactory,
-            String tableName) {
+            String tableName,
+            SchemaManager schemaManager) {
         super(
                 fileIO,
                 read,
@@ -72,6 +76,7 @@ public class BucketedAppendFileStoreWrite extends 
BaseAppendFileStoreWrite {
             super.withIgnorePreviousFiles(options.writeOnly());
         }
         this.commitUser = commitUser;
+        this.schemaManager = schemaManager;
     }
 
     @Override
@@ -94,6 +99,17 @@ public class BucketedAppendFileStoreWrite extends 
BaseAppendFileStoreWrite {
             @Nullable BucketedDvMaintainer dvMaintainer) {
         if (options.writeOnly()) {
             return new NoopCompactManager();
+        } else if (options.bucketClusterEnabled()) {
+            return new BucketedAppendClusterManager(
+                    compactExecutor,
+                    restoredFiles,
+                    schemaManager,
+                    options.clusteringColumns(),
+                    options.maxSizeAmplificationPercent(),
+                    options.sortedRunSizeRatio(),
+                    options.numSortedRunCompactionTrigger(),
+                    options.numLevels(),
+                    files -> clusterRewrite(partition, bucket, files));
         } else {
             Function<String, DeletionVector> dvFactory =
                     dvMaintainer != null
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index abbc2e4912..1378ecc586 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -673,14 +673,19 @@ public class SchemaValidation {
 
     private static void validateIncrementalClustering(TableSchema schema, 
CoreOptions options) {
         if (options.clusteringIncrementalEnabled()) {
-            checkArgument(
-                    options.bucket() == -1,
-                    "Cannot define %s for incremental clustering  table, it 
only support bucket = -1",
-                    CoreOptions.BUCKET.key());
             checkArgument(
                     schema.primaryKeys().isEmpty(),
                     "Cannot define %s for incremental clustering table.",
                     PRIMARY_KEY.key());
+            if (options.bucket() != -1) {
+                checkArgument(
+                        !options.bucketAppendOrdered(),
+                        "%s must be false for incremental clustering table.",
+                        CoreOptions.BUCKET_APPEND_ORDERED.key());
+                checkArgument(
+                        !options.deletionVectorsEnabled(),
+                        "Cannot enable deletion-vectors for incremental 
clustering table which bucket is not -1.");
+            }
         }
     }
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/append/cluster/BucketedAppendClusterManagerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/append/cluster/BucketedAppendClusterManagerTest.java
new file mode 100644
index 0000000000..0d2d60e699
--- /dev/null
+++ 
b/paimon-core/src/test/java/org/apache/paimon/append/cluster/BucketedAppendClusterManagerTest.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.append.cluster;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.FileSystemCatalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.compact.CompactResult;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.operation.BaseAppendFileStoreWrite;
+import org.apache.paimon.reader.RecordReaderIterator;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.AppendOnlyFileStoreTable;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.StreamTableCommit;
+import org.apache.paimon.types.DataTypes;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Executors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link BucketedAppendClusterManager}. */
+public class BucketedAppendClusterManagerTest {
+
+    @TempDir java.nio.file.Path tempDir;
+    @TempDir java.nio.file.Path ioManagerTempDir;
+
+    FileStoreTable table;
+    BaseAppendFileStoreWrite write;
+    StreamTableCommit commit;
+
+    @BeforeEach
+    public void before() throws Exception {
+        table = createFileStoreTable();
+        write =
+                (BaseAppendFileStoreWrite)
+                        table.store()
+                                .newWrite("ss")
+                                
.withIOManager(IOManager.create(ioManagerTempDir.toString()));
+        commit = table.newStreamWriteBuilder().newCommit();
+
+        for (int i = 0; i < 3; i++) {
+            for (int j = 0; j < 3; j++) {
+                write.write(BinaryRow.EMPTY_ROW, 0, GenericRow.of(0, i, j));
+                commit.commit(i, write.prepareCommit(false, i));
+            }
+        }
+    }
+
+    @Test
+    public void testBucketedAppendClusterTask() throws Exception {
+        List<DataFileMeta> toCluster =
+                
table.newSnapshotReader().read().dataSplits().get(0).dataFiles();
+
+        BucketedAppendClusterManager.BucketedAppendClusterTask task =
+                new BucketedAppendClusterManager.BucketedAppendClusterTask(
+                        toCluster, 5, files -> 
write.clusterRewrite(BinaryRow.EMPTY_ROW, 0, files));
+
+        CompactResult result = task.doCompact();
+        assertThat(result.before().size()).isEqualTo(9);
+        assertThat(result.after().size()).isEqualTo(1);
+        List<String> rows = new ArrayList<>();
+        try (RecordReaderIterator<InternalRow> clusterRows =
+                new RecordReaderIterator<>(
+                        ((AppendOnlyFileStoreTable) table)
+                                .store()
+                                .newRead()
+                                .createReader(BinaryRow.EMPTY_ROW, 0, 
result.after(), null))) {
+            while (clusterRows.hasNext()) {
+                InternalRow row = clusterRows.next();
+                rows.add(String.format("%d,%d", row.getInt(1), row.getInt(2)));
+            }
+        }
+
+        assertThat(rows)
+                .containsExactly("0,0", "0,1", "1,0", "1,1", "0,2", "1,2", 
"2,0", "2,1", "2,2");
+    }
+
+    @Test
+    public void testTriggerCompaction() throws Exception {
+        List<DataFileMeta> toCluster =
+                
table.newSnapshotReader().read().dataSplits().get(0).dataFiles();
+        CoreOptions options = table.coreOptions();
+        BucketedAppendClusterManager manager =
+                new BucketedAppendClusterManager(
+                        Executors.newSingleThreadExecutor(),
+                        toCluster,
+                        table.schemaManager(),
+                        options.clusteringColumns(),
+                        options.maxSizeAmplificationPercent(),
+                        options.sortedRunSizeRatio(),
+                        options.numSortedRunCompactionTrigger(),
+                        options.numLevels(),
+                        files -> write.clusterRewrite(BinaryRow.EMPTY_ROW, 0, 
files));
+        assertThat(manager.levels().levelSortedRuns().size()).isEqualTo(9);
+
+        manager.triggerCompaction(false);
+
+        CompactResult compactResult = manager.getCompactionResult(true).get();
+        assertThat(compactResult.before().size()).isEqualTo(9);
+        assertThat(compactResult.after().size()).isEqualTo(1);
+
+        assertThat(manager.levels().levelSortedRuns().size()).isEqualTo(1);
+        
assertThat(manager.levels().levelSortedRuns().get(0).level()).isEqualTo(5);
+    }
+
+    private FileStoreTable createFileStoreTable() throws Exception {
+        Catalog catalog = new FileSystemCatalog(LocalFileIO.create(), new 
Path(tempDir.toString()));
+        Schema schema =
+                Schema.newBuilder()
+                        .column("f0", DataTypes.INT())
+                        .column("f1", DataTypes.INT())
+                        .column("f2", DataTypes.INT())
+                        .option("bucket", "1")
+                        .option("bucket-key", "f0")
+                        .option("compaction.min.file-num", "10")
+                        .option("clustering.columns", "f1,f2")
+                        .option("clustering.strategy", "zorder")
+                        .build();
+        Identifier identifier = Identifier.create("default", "test");
+        catalog.createDatabase("default", false);
+        catalog.createTable(identifier, schema, false);
+        return (FileStoreTable) catalog.getTable(identifier);
+    }
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/append/cluster/IncrementalClusterManagerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/append/cluster/IncrementalClusterManagerTest.java
index 516fa63b55..cf81cdf85f 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/append/cluster/IncrementalClusterManagerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/append/cluster/IncrementalClusterManagerTest.java
@@ -57,18 +57,6 @@ public class IncrementalClusterManagerTest {
 
     @TempDir java.nio.file.Path tempDir;
 
-    @Test
-    public void testNonUnAwareBucketTable() {
-        Map<String, String> options = new HashMap<>();
-        options.put(CoreOptions.BUCKET.key(), "1");
-        options.put(CoreOptions.BUCKET_KEY.key(), "f0");
-
-        assertThatThrownBy(() -> createTable(options, Collections.emptyList()))
-                .isInstanceOf(IllegalArgumentException.class)
-                .hasMessageContaining(
-                        "Cannot define bucket for incremental clustering  
table, it only support bucket = -1");
-    }
-
     @Test
     public void testNonClusterIncremental() throws Exception {
         Map<String, String> options = new HashMap<>();
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/append/cluster/SorterTest.java 
b/paimon-core/src/test/java/org/apache/paimon/append/cluster/SorterTest.java
new file mode 100644
index 0000000000..31fbeb5c28
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/append/cluster/SorterTest.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.append.cluster;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.reader.RecordReaderIterator;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.FileStoreTableFactory;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.MutableObjectIterator;
+
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.paimon.append.cluster.IncrementalClusterManagerTest.writeOnce;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link Sorter}. */
+public class SorterTest {
+    @TempDir java.nio.file.Path tableTempDir;
+
+    @TempDir java.nio.file.Path ioTempDir;
+
+    @ParameterizedTest
+    @ValueSource(strings = {"hilbert", "zorder", "order"})
+    public void testSorter(String curve) throws Exception {
+        innerTest(curve);
+    }
+
+    private void innerTest(String curve) throws Exception {
+        FileStoreTable table = createTable(new HashMap<>(), curve);
+        writeOnce(
+                table,
+                GenericRow.of(2, 0, BinaryString.fromString("test")),
+                GenericRow.of(2, 1, BinaryString.fromString("test")),
+                GenericRow.of(2, 2, BinaryString.fromString("test")),
+                GenericRow.of(0, 0, BinaryString.fromString("test")),
+                GenericRow.of(0, 1, BinaryString.fromString("test")),
+                GenericRow.of(0, 2, BinaryString.fromString("test")),
+                GenericRow.of(1, 0, BinaryString.fromString("test")),
+                GenericRow.of(1, 1, BinaryString.fromString("test")),
+                GenericRow.of(1, 2, BinaryString.fromString("test")));
+        IOManager ioManager = IOManager.create(ioTempDir.toString());
+        ReadBuilder readBuilder = table.newReadBuilder();
+        Sorter sorter =
+                Sorter.getSorter(
+                        new RecordReaderIterator<>(
+                                
readBuilder.newRead().createReader(readBuilder.newScan().plan())),
+                        ioManager,
+                        table.rowType(),
+                        table.coreOptions());
+        List<String> result = new ArrayList<>();
+        MutableObjectIterator<BinaryRow> sorted = sorter.sort();
+        BinaryRow binaryRow = new BinaryRow(sorter.arity());
+        while ((binaryRow = sorted.next(binaryRow)) != null) {
+            InternalRow rowRemovedKey = sorter.removeSortKey(binaryRow);
+            result.add(String.format("%s,%s", rowRemovedKey.getInt(0), 
rowRemovedKey.getInt(1)));
+        }
+        verify(curve, result);
+    }
+
+    private void verify(String curve, List<String> result) {
+        switch (curve) {
+            case "hilbert":
+                assertThat(result)
+                        .containsExactly(
+                                "0,0", "0,1", "1,1", "1,0", "2,0", "2,1", 
"2,2", "1,2", "0,2");
+                break;
+            case "zorder":
+                assertThat(result)
+                        .containsExactly(
+                                "0,0", "0,1", "1,0", "1,1", "0,2", "1,2", 
"2,0", "2,1", "2,2");
+                break;
+            case "order":
+                assertThat(result)
+                        .containsExactly(
+                                "0,0", "0,1", "0,2", "1,0", "1,1", "1,2", 
"2,0", "2,1", "2,2");
+                break;
+        }
+    }
+
+    protected FileStoreTable createTable(Map<String, String> customOptions, 
String clusterCurve)
+            throws Exception {
+        Map<String, String> options = new HashMap<>();
+        options.put(CoreOptions.CLUSTERING_COLUMNS.key(), "f0,f1");
+        options.put(CoreOptions.CLUSTERING_STRATEGY.key(), clusterCurve);
+        options.putAll(customOptions);
+
+        Schema schema =
+                new Schema(
+                        RowType.of(DataTypes.INT(), DataTypes.INT(), 
DataTypes.STRING())
+                                .getFields(),
+                        Collections.emptyList(),
+                        Collections.emptyList(),
+                        options,
+                        "");
+
+        SchemaManager schemaManager =
+                new SchemaManager(LocalFileIO.create(), new 
Path(tableTempDir.toString()));
+        return FileStoreTableFactory.create(
+                LocalFileIO.create(),
+                new Path(tableTempDir.toString()),
+                schemaManager.createTable(schema));
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
index b807af5c86..df7fd0227e 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
@@ -164,7 +164,11 @@ public class CompactAction extends TableActionBase {
             StreamExecutionEnvironment env, FileStoreTable table, boolean 
isStreaming)
             throws Exception {
         if (fullCompaction == null) {
-            fullCompaction = !isStreaming;
+            if (table.coreOptions().bucketClusterEnabled()) {
+                fullCompaction = false;
+            } else {
+                fullCompaction = !isStreaming;
+            }
         } else {
             checkArgument(
                     !(fullCompaction && isStreaming),
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java
index 6c80070528..1538f8c083 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java
@@ -683,6 +683,171 @@ public class IncrementalClusterActionITCase extends 
ActionITCaseBase {
         assertThat(splits.get(0).deletionFiles().get().get(0)).isNull();
     }
 
+    @Test
+    public void testClusterWithBucket() throws Exception {
+        Map<String, String> dynamicOptions = commonOptions();
+        dynamicOptions.put(CoreOptions.BUCKET.key(), "2");
+        dynamicOptions.put(CoreOptions.BUCKET_KEY.key(), "pt");
+        dynamicOptions.put(CoreOptions.BUCKET_APPEND_ORDERED.key(), "false");
+        FileStoreTable table = createTable(null, dynamicOptions);
+
+        BinaryString randomStr = BinaryString.fromString(randomString(150));
+        List<CommitMessage> messages = new ArrayList<>();
+
+        // first write
+        for (int pt = 0; pt < 2; pt++) {
+            for (int i = 0; i < 3; i++) {
+                for (int j = 0; j < 3; j++) {
+                    messages.addAll(write(GenericRow.of(i, j, randomStr, pt)));
+                }
+            }
+        }
+        commit(messages);
+        ReadBuilder readBuilder = table.newReadBuilder().withProjection(new 
int[] {0, 1, 3});
+        List<String> result1 =
+                getResult(
+                        readBuilder.newRead(),
+                        readBuilder.newScan().plan().splits(),
+                        readBuilder.readType());
+        List<String> expected1 = new ArrayList<>();
+        for (int pt = 0; pt <= 1; pt++) {
+            expected1.add(String.format("+I[0, 0, %s]", pt));
+            expected1.add(String.format("+I[0, 1, %s]", pt));
+            expected1.add(String.format("+I[0, 2, %s]", pt));
+            expected1.add(String.format("+I[1, 0, %s]", pt));
+            expected1.add(String.format("+I[1, 1, %s]", pt));
+            expected1.add(String.format("+I[1, 2, %s]", pt));
+            expected1.add(String.format("+I[2, 0, %s]", pt));
+            expected1.add(String.format("+I[2, 1, %s]", pt));
+            expected1.add(String.format("+I[2, 2, %s]", pt));
+        }
+        assertThat(result1).containsExactlyElementsOf(expected1);
+
+        // first cluster
+        runAction(Collections.emptyList());
+        checkSnapshot(table);
+        List<Split> splits = readBuilder.newScan().plan().splits();
+        assertThat(splits.size()).isEqualTo(2);
+        assertThat(((DataSplit) 
splits.get(0)).dataFiles().size()).isEqualTo(1);
+        assertThat(((DataSplit) 
splits.get(0)).dataFiles().get(0).level()).isEqualTo(5);
+        List<String> result2 = getResult(readBuilder.newRead(), splits, 
readBuilder.readType());
+        List<String> expected2 = new ArrayList<>();
+        for (int pt = 1; pt >= 0; pt--) {
+            expected2.add(String.format("+I[0, 0, %s]", pt));
+            expected2.add(String.format("+I[0, 1, %s]", pt));
+            expected2.add(String.format("+I[1, 0, %s]", pt));
+            expected2.add(String.format("+I[1, 1, %s]", pt));
+            expected2.add(String.format("+I[0, 2, %s]", pt));
+            expected2.add(String.format("+I[1, 2, %s]", pt));
+            expected2.add(String.format("+I[2, 0, %s]", pt));
+            expected2.add(String.format("+I[2, 1, %s]", pt));
+            expected2.add(String.format("+I[2, 2, %s]", pt));
+        }
+        assertThat(result2).containsExactlyElementsOf(expected2);
+
+        // second write
+        messages.clear();
+        for (int pt = 0; pt <= 1; pt++) {
+            messages.addAll(
+                    write(
+                            GenericRow.of(0, 3, null, pt),
+                            GenericRow.of(1, 3, null, pt),
+                            GenericRow.of(2, 3, null, pt)));
+            messages.addAll(
+                    write(
+                            GenericRow.of(3, 0, null, pt),
+                            GenericRow.of(3, 1, null, pt),
+                            GenericRow.of(3, 2, null, pt),
+                            GenericRow.of(3, 3, null, pt)));
+        }
+        commit(messages);
+
+        List<String> result3 =
+                getResult(
+                        readBuilder.newRead(),
+                        readBuilder.newScan().plan().splits(),
+                        readBuilder.readType());
+        List<String> expected3 = new ArrayList<>();
+        for (int pt = 1; pt >= 0; pt--) {
+            expected3.add(String.format("+I[0, 0, %s]", pt));
+            expected3.add(String.format("+I[0, 1, %s]", pt));
+            expected3.add(String.format("+I[1, 0, %s]", pt));
+            expected3.add(String.format("+I[1, 1, %s]", pt));
+            expected3.add(String.format("+I[0, 2, %s]", pt));
+            expected3.add(String.format("+I[1, 2, %s]", pt));
+            expected3.add(String.format("+I[2, 0, %s]", pt));
+            expected3.add(String.format("+I[2, 1, %s]", pt));
+            expected3.add(String.format("+I[2, 2, %s]", pt));
+            expected3.add(String.format("+I[0, 3, %s]", pt));
+            expected3.add(String.format("+I[1, 3, %s]", pt));
+            expected3.add(String.format("+I[2, 3, %s]", pt));
+            expected3.add(String.format("+I[3, 0, %s]", pt));
+            expected3.add(String.format("+I[3, 1, %s]", pt));
+            expected3.add(String.format("+I[3, 2, %s]", pt));
+            expected3.add(String.format("+I[3, 3, %s]", pt));
+        }
+        assertThat(result3).containsExactlyElementsOf(expected3);
+
+        // second cluster(incremental)
+        runAction(Collections.emptyList());
+        checkSnapshot(table);
+        splits = readBuilder.newScan().plan().splits();
+        List<String> result4 = getResult(readBuilder.newRead(), splits, 
readBuilder.readType());
+        List<String> expected4 = new ArrayList<>();
+        for (int pt = 1; pt >= 0; pt--) {
+            expected4.add(String.format("+I[0, 0, %s]", pt));
+            expected4.add(String.format("+I[0, 1, %s]", pt));
+            expected4.add(String.format("+I[1, 0, %s]", pt));
+            expected4.add(String.format("+I[1, 1, %s]", pt));
+            expected4.add(String.format("+I[0, 2, %s]", pt));
+            expected4.add(String.format("+I[1, 2, %s]", pt));
+            expected4.add(String.format("+I[2, 0, %s]", pt));
+            expected4.add(String.format("+I[2, 1, %s]", pt));
+            expected4.add(String.format("+I[2, 2, %s]", pt));
+            expected4.add(String.format("+I[0, 3, %s]", pt));
+            expected4.add(String.format("+I[1, 3, %s]", pt));
+            expected4.add(String.format("+I[3, 0, %s]", pt));
+            expected4.add(String.format("+I[3, 1, %s]", pt));
+            expected4.add(String.format("+I[2, 3, %s]", pt));
+            expected4.add(String.format("+I[3, 2, %s]", pt));
+            expected4.add(String.format("+I[3, 3, %s]", pt));
+        }
+        assertThat(splits.size()).isEqualTo(2);
+        assertThat(((DataSplit) 
splits.get(0)).dataFiles().size()).isEqualTo(2);
+        assertThat(((DataSplit) 
splits.get(0)).dataFiles().get(0).level()).isEqualTo(5);
+        assertThat(((DataSplit) 
splits.get(0)).dataFiles().get(1).level()).isEqualTo(4);
+        assertThat(result4).containsExactlyElementsOf(expected4);
+
+        // full cluster
+        runAction(Lists.newArrayList("--compact_strategy", "full"));
+        checkSnapshot(table);
+        splits = readBuilder.newScan().plan().splits();
+        List<String> result5 = getResult(readBuilder.newRead(), splits, 
readBuilder.readType());
+        List<String> expected5 = new ArrayList<>();
+        for (int pt = 1; pt >= 0; pt--) {
+            expected5.add(String.format("+I[0, 0, %s]", pt));
+            expected5.add(String.format("+I[0, 1, %s]", pt));
+            expected5.add(String.format("+I[1, 0, %s]", pt));
+            expected5.add(String.format("+I[1, 1, %s]", pt));
+            expected5.add(String.format("+I[0, 2, %s]", pt));
+            expected5.add(String.format("+I[0, 3, %s]", pt));
+            expected5.add(String.format("+I[1, 2, %s]", pt));
+            expected5.add(String.format("+I[1, 3, %s]", pt));
+            expected5.add(String.format("+I[2, 0, %s]", pt));
+            expected5.add(String.format("+I[2, 1, %s]", pt));
+            expected5.add(String.format("+I[3, 0, %s]", pt));
+            expected5.add(String.format("+I[3, 1, %s]", pt));
+            expected5.add(String.format("+I[2, 2, %s]", pt));
+            expected5.add(String.format("+I[2, 3, %s]", pt));
+            expected5.add(String.format("+I[3, 2, %s]", pt));
+            expected5.add(String.format("+I[3, 3, %s]", pt));
+        }
+        assertThat(splits.size()).isEqualTo(2);
+        assertThat(((DataSplit) 
splits.get(0)).dataFiles().size()).isEqualTo(1);
+        assertThat(((DataSplit) 
splits.get(0)).dataFiles().get(0).level()).isEqualTo(5);
+        assertThat(result5).containsExactlyElementsOf(expected5);
+    }
+
     protected FileStoreTable createTable(String partitionKeys) throws 
Exception {
         return createTable(partitionKeys, commonOptions());
     }

Reply via email to