This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new c20198b219 [core] Support incremental clustering for append bucketed
table (#6961)
c20198b219 is described below
commit c20198b219f3c0d20cd5cea1681454c108701d37
Author: LsomeYeah <[email protected]>
AuthorDate: Thu Jan 8 13:43:42 2026 +0800
[core] Support incremental clustering for append bucketed table (#6961)
---
.../main/java/org/apache/paimon/CoreOptions.java | 6 +
.../org/apache/paimon/AppendOnlyFileStore.java | 3 +-
.../cluster/BucketedAppendClusterManager.java | 219 +++++++++++++++++++++
.../append/cluster/BucketedAppendLevels.java | 137 +++++++++++++
.../apache/paimon/append/cluster/HibertSorter.java | 80 ++++++++
.../apache/paimon/append/cluster/OrderSorter.java | 86 ++++++++
.../org/apache/paimon/append/cluster/Sorter.java | 157 +++++++++++++++
.../apache/paimon/append/cluster/ZorderSorter.java | 80 ++++++++
.../paimon/operation/BaseAppendFileStoreWrite.java | 38 ++++
.../operation/BucketedAppendFileStoreWrite.java | 18 +-
.../org/apache/paimon/schema/SchemaValidation.java | 13 +-
.../cluster/BucketedAppendClusterManagerTest.java | 154 +++++++++++++++
.../cluster/IncrementalClusterManagerTest.java | 12 --
.../apache/paimon/append/cluster/SorterTest.java | 139 +++++++++++++
.../apache/paimon/flink/action/CompactAction.java | 6 +-
.../action/IncrementalClusterActionITCase.java | 165 ++++++++++++++++
16 files changed, 1294 insertions(+), 19 deletions(-)
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index 78270d7523..ade2ac9c37 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -3193,6 +3193,12 @@ public class CoreOptions implements Serializable {
return options.get(CLUSTERING_INCREMENTAL);
}
+ public boolean bucketClusterEnabled() {
+ return !bucketAppendOrdered()
+ && !deletionVectorsEnabled()
+ && clusteringIncrementalEnabled();
+ }
+
public Duration clusteringHistoryPartitionIdleTime() {
return options.get(CLUSTERING_HISTORY_PARTITION_IDLE_TIME);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
index f3f0859d8c..51cfe58b42 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
@@ -141,7 +141,8 @@ public class AppendOnlyFileStore extends
AbstractFileStore<InternalRow> {
newScan(),
options,
dvMaintainerFactory,
- tableName);
+ tableName,
+ schemaManager);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/append/cluster/BucketedAppendClusterManager.java
b/paimon-core/src/main/java/org/apache/paimon/append/cluster/BucketedAppendClusterManager.java
new file mode 100644
index 0000000000..154d10c9d3
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/append/cluster/BucketedAppendClusterManager.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.append.cluster;
+
+import org.apache.paimon.AppendOnlyFileStore;
+import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.compact.CompactFutureManager;
+import org.apache.paimon.compact.CompactResult;
+import org.apache.paimon.compact.CompactTask;
+import org.apache.paimon.compact.CompactUnit;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.mergetree.LevelSortedRun;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.utils.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
+
+/** Cluster manager for {@link AppendOnlyFileStore}. */
+public class BucketedAppendClusterManager extends CompactFutureManager {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(BucketedAppendClusterManager.class);
+
+ private final ExecutorService executor;
+ private final BucketedAppendLevels levels;
+ private final IncrementalClusterStrategy strategy;
+ private final CompactRewriter rewriter;
+
+ public BucketedAppendClusterManager(
+ ExecutorService executor,
+ List<DataFileMeta> restored,
+ SchemaManager schemaManager,
+ List<String> clusterKeys,
+ int maxSizeAmp,
+ int sizeRatio,
+ int numRunCompactionTrigger,
+ int numLevels,
+ CompactRewriter rewriter) {
+ this.executor = executor;
+ this.levels = new BucketedAppendLevels(restored, numLevels);
+ this.strategy =
+ new IncrementalClusterStrategy(
+ schemaManager, clusterKeys, maxSizeAmp, sizeRatio,
numRunCompactionTrigger);
+ this.rewriter = rewriter;
+ }
+
+ @Override
+ public boolean shouldWaitForLatestCompaction() {
+ return false;
+ }
+
+ @Override
+ public boolean shouldWaitForPreparingCheckpoint() {
+ return false;
+ }
+
+ @Override
+ public void addNewFile(DataFileMeta file) {
+ levels.addLevel0File(file);
+ }
+
+ @Override
+ public List<DataFileMeta> allFiles() {
+ return levels.allFiles();
+ }
+
+ @Override
+ public void triggerCompaction(boolean fullCompaction) {
+ Optional<CompactUnit> optionalUnit;
+ List<LevelSortedRun> runs = levels.levelSortedRuns();
+ if (fullCompaction) {
+ Preconditions.checkState(
+ taskFuture == null,
+ "A compaction task is still running while the user "
+ + "forces a new compaction. This is unexpected.");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Trigger forced full compaction. Picking from the
following runs\n{}",
+ runs);
+ }
+ optionalUnit = strategy.pick(levels.numberOfLevels(), runs, true);
+ } else {
+ if (taskFuture != null) {
+ return;
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Trigger normal compaction. Picking from the
following runs\n{}", runs);
+ }
+ optionalUnit =
+ strategy.pick(levels.numberOfLevels(), runs, false)
+ .filter(unit -> !unit.files().isEmpty());
+ }
+
+ optionalUnit.ifPresent(
+ unit -> {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Submit compaction with files (name, level,
size): "
+ + levels.levelSortedRuns().stream()
+ .flatMap(lsr ->
lsr.run().files().stream())
+ .map(
+ file ->
+ String.format(
+ "(%s,
%d, %d)",
+
file.fileName(),
+
file.level(),
+
file.fileSize()))
+ .collect(Collectors.joining(",
")));
+ }
+ submitCompaction(unit);
+ });
+ }
+
+ private void submitCompaction(CompactUnit unit) {
+
+ BucketedAppendClusterTask task =
+ new BucketedAppendClusterTask(unit.files(),
unit.outputLevel(), rewriter);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Pick these files (name, level, size) for {} compaction:
{}",
+ task.getClass().getSimpleName(),
+ unit.files().stream()
+ .map(
+ file ->
+ String.format(
+ "(%s, %d, %d)",
+ file.fileName(),
file.level(), file.fileSize()))
+ .collect(Collectors.joining(", ")));
+ }
+ taskFuture = executor.submit(task);
+ }
+
+ @Override
+ public Optional<CompactResult> getCompactionResult(boolean blocking)
+ throws ExecutionException, InterruptedException {
+ Optional<CompactResult> result = innerGetCompactionResult(blocking);
+ result.ifPresent(
+ r -> {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Update levels in compact manager with these
changes:\nBefore:\n{}\nAfter:\n{}",
+ r.before(),
+ r.after());
+ }
+ levels.update(r.before(), r.after());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Levels in compact manager updated. Current
runs are\n{}",
+ levels.levelSortedRuns());
+ }
+ });
+ return result;
+ }
+
+ @Override
+ public void close() throws IOException {}
+
+ @VisibleForTesting
+ public BucketedAppendLevels levels() {
+ return levels;
+ }
+
+ /** A {@link CompactTask} impl for clustering of append bucketed table. */
+ public static class BucketedAppendClusterTask extends CompactTask {
+
+ private final List<DataFileMeta> toCluster;
+ private final int outputLevel;
+ private final CompactRewriter rewriter;
+
+ public BucketedAppendClusterTask(
+ List<DataFileMeta> toCluster, int outputLevel, CompactRewriter
rewriter) {
+ super(null);
+ this.toCluster = toCluster;
+ this.outputLevel = outputLevel;
+ this.rewriter = rewriter;
+ }
+
+ @Override
+ protected CompactResult doCompact() throws Exception {
+ List<DataFileMeta> rewrite = rewriter.rewrite(toCluster);
+ return new CompactResult(toCluster, upgrade(rewrite));
+ }
+
+ protected List<DataFileMeta> upgrade(List<DataFileMeta> files) {
+ return files.stream()
+ .map(file -> file.upgrade(outputLevel))
+ .collect(Collectors.toList());
+ }
+ }
+
+ /** Compact rewriter for append-only table. */
+ public interface CompactRewriter {
+ List<DataFileMeta> rewrite(List<DataFileMeta> compactBefore) throws
Exception;
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/append/cluster/BucketedAppendLevels.java
b/paimon-core/src/main/java/org/apache/paimon/append/cluster/BucketedAppendLevels.java
new file mode 100644
index 0000000000..ba62fd9748
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/append/cluster/BucketedAppendLevels.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.append.cluster;
+
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.mergetree.LevelSortedRun;
+import org.apache.paimon.mergetree.SortedRun;
+import org.apache.paimon.utils.Preconditions;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static java.util.Collections.emptyList;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/** A class which stores all level files in append bucketed table. */
+public class BucketedAppendLevels {
+
+ private final HashSet<DataFileMeta> level0;
+
+ private final List<SortedRun> levels;
+
+ public BucketedAppendLevels(List<DataFileMeta> inputFiles, int numLevels) {
+ int restoredNumLevels =
+ Math.max(
+ numLevels,
+
inputFiles.stream().mapToInt(DataFileMeta::level).max().orElse(-1) + 1);
+ checkArgument(restoredNumLevels > 1, "Number of levels must be at
least 2.");
+ this.level0 = new HashSet<>();
+ this.levels = new ArrayList<>();
+ for (int i = 1; i < restoredNumLevels; i++) {
+ levels.add(SortedRun.empty());
+ }
+
+ Map<Integer, List<DataFileMeta>> levelMap = new HashMap<>();
+ for (DataFileMeta file : inputFiles) {
+ levelMap.computeIfAbsent(file.level(), level -> new
ArrayList<>()).add(file);
+ }
+ levelMap.forEach((level, files) -> updateLevel(level, emptyList(),
files));
+
+ Preconditions.checkState(
+ level0.size() + levels.stream().mapToInt(r ->
r.files().size()).sum()
+ == inputFiles.size(),
+ "Number of files stored in Levels does not equal to the size
of inputFiles. This is unexpected.");
+ }
+
+ public void addLevel0File(DataFileMeta file) {
+ checkArgument(file.level() == 0);
+ level0.add(file);
+ }
+
+ public SortedRun runOfLevel(int level) {
+ checkArgument(level > 0, "Level0 does not have one single sorted
run.");
+ return levels.get(level - 1);
+ }
+
+ public int numberOfLevels() {
+ return levels.size() + 1;
+ }
+
+ public int maxLevel() {
+ return levels.size();
+ }
+
+ public List<DataFileMeta> allFiles() {
+ List<DataFileMeta> files = new ArrayList<>();
+ List<LevelSortedRun> runs = levelSortedRuns();
+ for (LevelSortedRun run : runs) {
+ files.addAll(run.run().files());
+ }
+ return files;
+ }
+
+ public List<LevelSortedRun> levelSortedRuns() {
+ List<LevelSortedRun> runs = new ArrayList<>();
+ level0.forEach(file -> runs.add(new LevelSortedRun(0,
SortedRun.fromSingle(file))));
+ for (int i = 0; i < levels.size(); i++) {
+ SortedRun run = levels.get(i);
+ if (run.nonEmpty()) {
+ runs.add(new LevelSortedRun(i + 1, run));
+ }
+ }
+ return runs;
+ }
+
+ public void update(List<DataFileMeta> before, List<DataFileMeta> after) {
+ Map<Integer, List<DataFileMeta>> groupedBefore = groupByLevel(before);
+ Map<Integer, List<DataFileMeta>> groupedAfter = groupByLevel(after);
+ for (int i = 0; i < numberOfLevels(); i++) {
+ updateLevel(
+ i,
+ groupedBefore.getOrDefault(i, emptyList()),
+ groupedAfter.getOrDefault(i, emptyList()));
+ }
+ }
+
+ private void updateLevel(int level, List<DataFileMeta> before,
List<DataFileMeta> after) {
+ if (before.isEmpty() && after.isEmpty()) {
+ return;
+ }
+
+ if (level == 0) {
+ before.forEach(level0::remove);
+ level0.addAll(after);
+ } else {
+ List<DataFileMeta> files = new
ArrayList<>(runOfLevel(level).files());
+ files.removeAll(before);
+ files.addAll(after);
+ levels.set(level - 1, SortedRun.fromSorted(files));
+ }
+ }
+
+ private Map<Integer, List<DataFileMeta>> groupByLevel(List<DataFileMeta>
files) {
+ return files.stream()
+ .collect(Collectors.groupingBy(DataFileMeta::level,
Collectors.toList()));
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/append/cluster/HibertSorter.java
b/paimon-core/src/main/java/org/apache/paimon/append/cluster/HibertSorter.java
new file mode 100644
index 0000000000..fcd59ce0d3
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/append/cluster/HibertSorter.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.append.cluster;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.JoinedRow;
+import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.reader.RecordReaderIterator;
+import org.apache.paimon.sort.hilbert.HilbertIndexer;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+/** Hilbert sorter for clustering. */
+public class HibertSorter extends Sorter {
+
+ private static final RowType KEY_TYPE =
+ new RowType(Collections.singletonList(new DataField(0, "H_INDEX",
DataTypes.BYTES())));
+
+ private final HibertKeyAbstract hilbertKeyAbstract;
+
+ public HibertSorter(
+ RecordReaderIterator<InternalRow> reader,
+ RowType valueType,
+ CoreOptions options,
+ List<String> orderColNames,
+ IOManager ioManager) {
+ super(reader, KEY_TYPE, valueType, options, ioManager);
+ this.hilbertKeyAbstract = new HibertKeyAbstract(valueType,
orderColNames);
+ this.hilbertKeyAbstract.open();
+ }
+
+ @Override
+ public InternalRow assignSortKey(InternalRow row) {
+ byte[] key = hilbertKeyAbstract.apply(row);
+ return new JoinedRow(GenericRow.of(key), row);
+ }
+
+ private static class HibertKeyAbstract implements KeyAbstract<byte[]> {
+
+ private final HilbertIndexer hilbertIndexer;
+
+ public HibertKeyAbstract(RowType rowType, List<String> orderColNames) {
+ hilbertIndexer = new HilbertIndexer(rowType, orderColNames);
+ }
+
+ @Override
+ public void open() {
+ hilbertIndexer.open();
+ }
+
+ @Override
+ public byte[] apply(InternalRow value) {
+ byte[] hilbert = hilbertIndexer.index(value);
+ return Arrays.copyOf(hilbert, hilbert.length);
+ }
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/append/cluster/OrderSorter.java
b/paimon-core/src/main/java/org/apache/paimon/append/cluster/OrderSorter.java
new file mode 100644
index 0000000000..ee2ace3f3a
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/append/cluster/OrderSorter.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.append.cluster;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.codegen.CodeGenUtils;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.JoinedRow;
+import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.reader.RecordReaderIterator;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.Projection;
+
+import java.util.List;
+
+import static org.apache.paimon.table.PrimaryKeyTableUtils.addKeyNamePrefix;
+
+/** Order sorter for clustering. */
+public class OrderSorter extends Sorter {
+
+ private final OrderKeyAbstract orderKeyAbstract;
+
+ public OrderSorter(
+ RecordReaderIterator<InternalRow> reader,
+ RowType valueType,
+ CoreOptions options,
+ List<String> orderColNames,
+ IOManager ioManager) {
+ super(reader, keyType(valueType, orderColNames), valueType, options,
ioManager);
+ this.orderKeyAbstract = new OrderKeyAbstract(valueType, orderColNames);
+ this.orderKeyAbstract.open();
+ }
+
+ @Override
+ public InternalRow assignSortKey(InternalRow row) {
+ InternalRow key = orderKeyAbstract.apply(row);
+ return new JoinedRow(key, row);
+ }
+
+ private static RowType keyType(RowType valueType, List<String>
orderColNames) {
+ List<String> fieldNames = valueType.getFieldNames();
+ int[] keyProjectionMap =
orderColNames.stream().mapToInt(fieldNames::indexOf).toArray();
+ return
addKeyNamePrefix(Projection.of(keyProjectionMap).project(valueType));
+ }
+
+ private static class OrderKeyAbstract implements KeyAbstract<InternalRow> {
+
+ private final RowType valueRowType;
+ private final int[] keyProjectionMap;
+
+ private transient org.apache.paimon.codegen.Projection keyProjection;
+
+ public OrderKeyAbstract(RowType rowType, List<String> orderColNames) {
+ this.valueRowType = rowType;
+ List<String> fieldNames = rowType.getFieldNames();
+ this.keyProjectionMap =
orderColNames.stream().mapToInt(fieldNames::indexOf).toArray();
+ }
+
+ @Override
+ public void open() {
+ // use key gen to speed up projection
+ keyProjection = CodeGenUtils.newProjection(valueRowType,
keyProjectionMap);
+ }
+
+ @Override
+ public InternalRow apply(InternalRow value) {
+ return keyProjection.apply(value).copy();
+ }
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/append/cluster/Sorter.java
b/paimon-core/src/main/java/org/apache/paimon/append/cluster/Sorter.java
new file mode 100644
index 0000000000..60dbecdde0
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/append/cluster/Sorter.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.append.cluster;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.compression.CompressOptions;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.options.MemorySize;
+import org.apache.paimon.reader.RecordReaderIterator;
+import org.apache.paimon.sort.BinaryExternalSortBuffer;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.KeyProjectedRow;
+import org.apache.paimon.utils.MutableObjectIterator;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.IntStream;
+
+/** Sorter for clustering. */
+public abstract class Sorter {
+
+ protected final RecordReaderIterator<InternalRow> reader;
+ protected final RowType keyType;
+ protected final RowType longRowType;
+ protected final int[] valueProjectionMap;
+ private final int arity;
+
+ private final transient IOManager ioManager;
+ private final transient BinaryExternalSortBuffer buffer;
+
+ public Sorter(
+ RecordReaderIterator<InternalRow> reader,
+ RowType keyType,
+ RowType valueType,
+ CoreOptions options,
+ IOManager ioManager) {
+ this.reader = reader;
+ this.keyType = keyType;
+ int keyFieldCount = keyType.getFieldCount();
+ int valueFieldCount = valueType.getFieldCount();
+ this.valueProjectionMap = new int[valueFieldCount];
+ for (int i = 0; i < valueFieldCount; i++) {
+ this.valueProjectionMap[i] = i + keyFieldCount;
+ }
+ List<DataField> keyFields = keyType.getFields();
+ List<DataField> dataFields = valueType.getFields();
+ List<DataField> fields = new ArrayList<>();
+ fields.addAll(keyFields);
+ fields.addAll(dataFields);
+ this.longRowType = new RowType(fields);
+ this.arity = longRowType.getFieldCount();
+
+ long maxMemory = options.writeBufferSize();
+ int pageSize = options.pageSize();
+ int spillSortMaxNumFiles = options.localSortMaxNumFileHandles();
+ CompressOptions spillCompression = options.spillCompressOptions();
+ MemorySize maxDiskSize = options.writeBufferSpillDiskSize();
+ boolean sequenceOrder = options.sequenceFieldSortOrderIsAscending();
+
+ this.ioManager = ioManager;
+ this.buffer =
+ BinaryExternalSortBuffer.create(
+ ioManager,
+ longRowType,
+ IntStream.range(0, keyType.getFieldCount()).toArray(),
+ maxMemory,
+ pageSize,
+ spillSortMaxNumFiles,
+ spillCompression,
+ maxDiskSize,
+ sequenceOrder);
+ }
+
+ public abstract InternalRow assignSortKey(InternalRow row);
+
+ public InternalRow removeSortKey(InternalRow rowWithKey) {
+ KeyProjectedRow keyProjectedRow = new
KeyProjectedRow(valueProjectionMap);
+ return keyProjectedRow.replaceRow(rowWithKey);
+ }
+
+ public MutableObjectIterator<BinaryRow> sort() throws IOException {
+ while (reader.hasNext()) {
+ InternalRow row = reader.next();
+ InternalRow rowWithKey = assignSortKey(row);
+ buffer.write(rowWithKey);
+ }
+
+ if (buffer.size() > 0) {
+ return buffer.sortedIterator();
+ } else {
+ throw new IllegalStateException("numRecords after sorting is 0.");
+ }
+ }
+
+ public int arity() {
+ return arity;
+ }
+
+ public void close() throws Exception {
+ if (buffer != null) {
+ buffer.clear();
+ }
+ if (ioManager != null) {
+ ioManager.close();
+ }
+ }
+
+ public static Sorter getSorter(
+ RecordReaderIterator<InternalRow> reader,
+ IOManager ioManager,
+ RowType rowType,
+ CoreOptions options) {
+ CoreOptions.OrderType clusterCurve =
+ options.clusteringStrategy(options.clusteringColumns().size());
+ switch (clusterCurve) {
+ case HILBERT:
+ return new HibertSorter(
+ reader, rowType, options, options.clusteringColumns(),
ioManager);
+ case ZORDER:
+ return new ZorderSorter(
+ reader, rowType, options, options.clusteringColumns(),
ioManager);
+ case ORDER:
+ return new OrderSorter(
+ reader, rowType, options, options.clusteringColumns(),
ioManager);
+ default:
+ throw new IllegalArgumentException("cannot match cluster type:
" + clusterCurve);
+ }
+ }
+
+ /** Abstract key from a row data. */
+ public interface KeyAbstract<KEY> extends Serializable {
+ default void open() {}
+
+ KEY apply(InternalRow value);
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/append/cluster/ZorderSorter.java
b/paimon-core/src/main/java/org/apache/paimon/append/cluster/ZorderSorter.java
new file mode 100644
index 0000000000..57e95d06c6
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/append/cluster/ZorderSorter.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.append.cluster;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.JoinedRow;
+import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.reader.RecordReaderIterator;
+import org.apache.paimon.sort.zorder.ZIndexer;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+/** Z-order sorter for clustering. */
+public class ZorderSorter extends Sorter {
+
+ private static final RowType KEY_TYPE =
+ new RowType(Collections.singletonList(new DataField(0, "Z_INDEX",
DataTypes.BYTES())));
+
+ private final ZorderKeyAbstract zorderKeyAbstract;
+
+ public ZorderSorter(
+ RecordReaderIterator<InternalRow> reader,
+ RowType valueType,
+ CoreOptions options,
+ List<String> orderColNames,
+ IOManager ioManager) {
+ super(reader, KEY_TYPE, valueType, options, ioManager);
+ this.zorderKeyAbstract = new ZorderKeyAbstract(valueType, options,
orderColNames);
+ this.zorderKeyAbstract.open();
+ }
+
+ @Override
+ public InternalRow assignSortKey(InternalRow row) {
+ byte[] key = zorderKeyAbstract.apply(row);
+ return new JoinedRow(GenericRow.of(key), row);
+ }
+
+ private static class ZorderKeyAbstract implements KeyAbstract<byte[]> {
+
+ private final ZIndexer zIndexer;
+
+ public ZorderKeyAbstract(RowType rowType, CoreOptions options,
List<String> orderColNames) {
+ zIndexer = new ZIndexer(rowType, orderColNames,
options.varTypeSize());
+ }
+
+ @Override
+ public void open() {
+ zIndexer.open();
+ }
+
+ @Override
+ public byte[] apply(InternalRow value) {
+ byte[] zorder = zIndexer.index(value);
+ return Arrays.copyOf(zorder, zorder.length);
+ }
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java
index 6dee947308..1c63f41b02 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java
@@ -21,6 +21,7 @@ package org.apache.paimon.operation;
import org.apache.paimon.AppendOnlyFileStore;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.append.AppendOnlyWriter;
+import org.apache.paimon.append.cluster.Sorter;
import org.apache.paimon.compact.CompactManager;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
@@ -41,6 +42,7 @@ import org.apache.paimon.utils.ExceptionUtils;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.IOExceptionSupplier;
import org.apache.paimon.utils.LongCounter;
+import org.apache.paimon.utils.MutableObjectIterator;
import org.apache.paimon.utils.RecordWriter;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.StatsCollectorFactories;
@@ -205,6 +207,42 @@ public abstract class BaseAppendFileStoreWrite extends
MemoryFileStoreWrite<Inte
return rewriter.result();
}
+ public List<DataFileMeta> clusterRewrite(
+ BinaryRow partition, int bucket, List<DataFileMeta> toCluster)
throws Exception {
+ RecordReaderIterator<InternalRow> reader =
+ createFilesIterator(partition, bucket, toCluster, null);
+
+ // sort and rewrite
+ Exception collectedExceptions = null;
+ Sorter sorter = Sorter.getSorter(reader, ioManager, rowType, options);
+ RowDataRollingFileWriter rewriter =
+ createRollingFileWriter(
+ partition, bucket, new
LongCounter(toCluster.get(0).minSequenceNumber()));
+ try {
+ MutableObjectIterator<BinaryRow> sorted = sorter.sort();
+ BinaryRow binaryRow = new BinaryRow(sorter.arity());
+ while ((binaryRow = sorted.next(binaryRow)) != null) {
+ InternalRow rowRemovedKey = sorter.removeSortKey(binaryRow);
+ rewriter.write(rowRemovedKey);
+ }
+ } catch (Exception e) {
+ collectedExceptions = e;
+ } finally {
+ try {
+ rewriter.close();
+ sorter.close();
+ } catch (Exception e) {
+ collectedExceptions = ExceptionUtils.firstOrSuppressed(e,
collectedExceptions);
+ }
+ }
+
+ if (collectedExceptions != null) {
+ throw collectedExceptions;
+ }
+
+ return rewriter.result();
+ }
+
private RowDataRollingFileWriter createRollingFileWriter(
BinaryRow partition, int bucket, LongCounter seqNumCounter) {
return new RowDataRollingFileWriter(
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/BucketedAppendFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/BucketedAppendFileStoreWrite.java
index a3f2083e25..b93c2b5115 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/BucketedAppendFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/BucketedAppendFileStoreWrite.java
@@ -20,6 +20,7 @@ package org.apache.paimon.operation;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.append.BucketedAppendCompactManager;
+import org.apache.paimon.append.cluster.BucketedAppendClusterManager;
import org.apache.paimon.compact.CompactManager;
import org.apache.paimon.compact.NoopCompactManager;
import org.apache.paimon.data.BinaryRow;
@@ -28,6 +29,7 @@ import org.apache.paimon.deletionvectors.BucketedDvMaintainer;
import org.apache.paimon.deletionvectors.DeletionVector;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.SnapshotManager;
@@ -42,6 +44,7 @@ import java.util.function.Function;
public class BucketedAppendFileStoreWrite extends BaseAppendFileStoreWrite {
private final String commitUser;
+ private final SchemaManager schemaManager;
public BucketedAppendFileStoreWrite(
FileIO fileIO,
@@ -55,7 +58,8 @@ public class BucketedAppendFileStoreWrite extends
BaseAppendFileStoreWrite {
FileStoreScan scan,
CoreOptions options,
@Nullable BucketedDvMaintainer.Factory dvMaintainerFactory,
- String tableName) {
+ String tableName,
+ SchemaManager schemaManager) {
super(
fileIO,
read,
@@ -72,6 +76,7 @@ public class BucketedAppendFileStoreWrite extends
BaseAppendFileStoreWrite {
super.withIgnorePreviousFiles(options.writeOnly());
}
this.commitUser = commitUser;
+ this.schemaManager = schemaManager;
}
@Override
@@ -94,6 +99,17 @@ public class BucketedAppendFileStoreWrite extends
BaseAppendFileStoreWrite {
@Nullable BucketedDvMaintainer dvMaintainer) {
if (options.writeOnly()) {
return new NoopCompactManager();
+ } else if (options.bucketClusterEnabled()) {
+ return new BucketedAppendClusterManager(
+ compactExecutor,
+ restoredFiles,
+ schemaManager,
+ options.clusteringColumns(),
+ options.maxSizeAmplificationPercent(),
+ options.sortedRunSizeRatio(),
+ options.numSortedRunCompactionTrigger(),
+ options.numLevels(),
+ files -> clusterRewrite(partition, bucket, files));
} else {
Function<String, DeletionVector> dvFactory =
dvMaintainer != null
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index abbc2e4912..1378ecc586 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -673,14 +673,19 @@ public class SchemaValidation {
private static void validateIncrementalClustering(TableSchema schema,
CoreOptions options) {
if (options.clusteringIncrementalEnabled()) {
- checkArgument(
- options.bucket() == -1,
- "Cannot define %s for incremental clustering table, it
only support bucket = -1",
- CoreOptions.BUCKET.key());
checkArgument(
schema.primaryKeys().isEmpty(),
"Cannot define %s for incremental clustering table.",
PRIMARY_KEY.key());
+ if (options.bucket() != -1) {
+ checkArgument(
+ !options.bucketAppendOrdered(),
+ "%s must be false for incremental clustering table.",
+ CoreOptions.BUCKET_APPEND_ORDERED.key());
+ checkArgument(
+ !options.deletionVectorsEnabled(),
+ "Cannot enable deletion-vectors for incremental
clustering table which bucket is not -1.");
+ }
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/append/cluster/BucketedAppendClusterManagerTest.java
b/paimon-core/src/test/java/org/apache/paimon/append/cluster/BucketedAppendClusterManagerTest.java
new file mode 100644
index 0000000000..0d2d60e699
--- /dev/null
+++
b/paimon-core/src/test/java/org/apache/paimon/append/cluster/BucketedAppendClusterManagerTest.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.append.cluster;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.FileSystemCatalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.compact.CompactResult;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.operation.BaseAppendFileStoreWrite;
+import org.apache.paimon.reader.RecordReaderIterator;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.AppendOnlyFileStoreTable;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.StreamTableCommit;
+import org.apache.paimon.types.DataTypes;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Executors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link BucketedAppendClusterManager}. */
+public class BucketedAppendClusterManagerTest {
+
+ @TempDir java.nio.file.Path tempDir;
+ @TempDir java.nio.file.Path ioManagerTempDir;
+
+ FileStoreTable table;
+ BaseAppendFileStoreWrite write;
+ StreamTableCommit commit;
+
+ @BeforeEach
+ public void before() throws Exception {
+ table = createFileStoreTable();
+ write =
+ (BaseAppendFileStoreWrite)
+ table.store()
+ .newWrite("ss")
+
.withIOManager(IOManager.create(ioManagerTempDir.toString()));
+ commit = table.newStreamWriteBuilder().newCommit();
+
+ for (int i = 0; i < 3; i++) {
+ for (int j = 0; j < 3; j++) {
+ write.write(BinaryRow.EMPTY_ROW, 0, GenericRow.of(0, i, j));
+ commit.commit(i, write.prepareCommit(false, i));
+ }
+ }
+ }
+
+ @Test
+ public void testBucketedAppendClusterTask() throws Exception {
+ List<DataFileMeta> toCluster =
+
table.newSnapshotReader().read().dataSplits().get(0).dataFiles();
+
+ BucketedAppendClusterManager.BucketedAppendClusterTask task =
+ new BucketedAppendClusterManager.BucketedAppendClusterTask(
+ toCluster, 5, files ->
write.clusterRewrite(BinaryRow.EMPTY_ROW, 0, files));
+
+ CompactResult result = task.doCompact();
+ assertThat(result.before().size()).isEqualTo(9);
+ assertThat(result.after().size()).isEqualTo(1);
+ List<String> rows = new ArrayList<>();
+ try (RecordReaderIterator<InternalRow> clusterRows =
+ new RecordReaderIterator<>(
+ ((AppendOnlyFileStoreTable) table)
+ .store()
+ .newRead()
+ .createReader(BinaryRow.EMPTY_ROW, 0,
result.after(), null))) {
+ while (clusterRows.hasNext()) {
+ InternalRow row = clusterRows.next();
+ rows.add(String.format("%d,%d", row.getInt(1), row.getInt(2)));
+ }
+ }
+
+ assertThat(rows)
+ .containsExactly("0,0", "0,1", "1,0", "1,1", "0,2", "1,2",
"2,0", "2,1", "2,2");
+ }
+
+ @Test
+ public void testTriggerCompaction() throws Exception {
+ List<DataFileMeta> toCluster =
+
table.newSnapshotReader().read().dataSplits().get(0).dataFiles();
+ CoreOptions options = table.coreOptions();
+ BucketedAppendClusterManager manager =
+ new BucketedAppendClusterManager(
+ Executors.newSingleThreadExecutor(),
+ toCluster,
+ table.schemaManager(),
+ options.clusteringColumns(),
+ options.maxSizeAmplificationPercent(),
+ options.sortedRunSizeRatio(),
+ options.numSortedRunCompactionTrigger(),
+ options.numLevels(),
+ files -> write.clusterRewrite(BinaryRow.EMPTY_ROW, 0,
files));
+ assertThat(manager.levels().levelSortedRuns().size()).isEqualTo(9);
+
+ manager.triggerCompaction(false);
+
+ CompactResult compactResult = manager.getCompactionResult(true).get();
+ assertThat(compactResult.before().size()).isEqualTo(9);
+ assertThat(compactResult.after().size()).isEqualTo(1);
+
+ assertThat(manager.levels().levelSortedRuns().size()).isEqualTo(1);
+
assertThat(manager.levels().levelSortedRuns().get(0).level()).isEqualTo(5);
+ }
+
+ private FileStoreTable createFileStoreTable() throws Exception {
+ Catalog catalog = new FileSystemCatalog(LocalFileIO.create(), new
Path(tempDir.toString()));
+ Schema schema =
+ Schema.newBuilder()
+ .column("f0", DataTypes.INT())
+ .column("f1", DataTypes.INT())
+ .column("f2", DataTypes.INT())
+ .option("bucket", "1")
+ .option("bucket-key", "f0")
+ .option("compaction.min.file-num", "10")
+ .option("clustering.columns", "f1,f2")
+ .option("clustering.strategy", "zorder")
+ .build();
+ Identifier identifier = Identifier.create("default", "test");
+ catalog.createDatabase("default", false);
+ catalog.createTable(identifier, schema, false);
+ return (FileStoreTable) catalog.getTable(identifier);
+ }
+}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/append/cluster/IncrementalClusterManagerTest.java
b/paimon-core/src/test/java/org/apache/paimon/append/cluster/IncrementalClusterManagerTest.java
index 516fa63b55..cf81cdf85f 100644
---
a/paimon-core/src/test/java/org/apache/paimon/append/cluster/IncrementalClusterManagerTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/append/cluster/IncrementalClusterManagerTest.java
@@ -57,18 +57,6 @@ public class IncrementalClusterManagerTest {
@TempDir java.nio.file.Path tempDir;
- @Test
- public void testNonUnAwareBucketTable() {
- Map<String, String> options = new HashMap<>();
- options.put(CoreOptions.BUCKET.key(), "1");
- options.put(CoreOptions.BUCKET_KEY.key(), "f0");
-
- assertThatThrownBy(() -> createTable(options, Collections.emptyList()))
- .isInstanceOf(IllegalArgumentException.class)
- .hasMessageContaining(
- "Cannot define bucket for incremental clustering
table, it only support bucket = -1");
- }
-
@Test
public void testNonClusterIncremental() throws Exception {
Map<String, String> options = new HashMap<>();
diff --git
a/paimon-core/src/test/java/org/apache/paimon/append/cluster/SorterTest.java
b/paimon-core/src/test/java/org/apache/paimon/append/cluster/SorterTest.java
new file mode 100644
index 0000000000..31fbeb5c28
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/append/cluster/SorterTest.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.append.cluster;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.reader.RecordReaderIterator;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.FileStoreTableFactory;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.MutableObjectIterator;
+
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static
org.apache.paimon.append.cluster.IncrementalClusterManagerTest.writeOnce;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link Sorter}. */
+public class SorterTest {
+ @TempDir java.nio.file.Path tableTempDir;
+
+ @TempDir java.nio.file.Path ioTempDir;
+
+ @ParameterizedTest
+ @ValueSource(strings = {"hilbert", "zorder", "order"})
+ public void testSorter(String curve) throws Exception {
+ innerTest(curve);
+ }
+
+ private void innerTest(String curve) throws Exception {
+ FileStoreTable table = createTable(new HashMap<>(), curve);
+ writeOnce(
+ table,
+ GenericRow.of(2, 0, BinaryString.fromString("test")),
+ GenericRow.of(2, 1, BinaryString.fromString("test")),
+ GenericRow.of(2, 2, BinaryString.fromString("test")),
+ GenericRow.of(0, 0, BinaryString.fromString("test")),
+ GenericRow.of(0, 1, BinaryString.fromString("test")),
+ GenericRow.of(0, 2, BinaryString.fromString("test")),
+ GenericRow.of(1, 0, BinaryString.fromString("test")),
+ GenericRow.of(1, 1, BinaryString.fromString("test")),
+ GenericRow.of(1, 2, BinaryString.fromString("test")));
+ IOManager ioManager = IOManager.create(ioTempDir.toString());
+ ReadBuilder readBuilder = table.newReadBuilder();
+ Sorter sorter =
+ Sorter.getSorter(
+ new RecordReaderIterator<>(
+
readBuilder.newRead().createReader(readBuilder.newScan().plan())),
+ ioManager,
+ table.rowType(),
+ table.coreOptions());
+ List<String> result = new ArrayList<>();
+ MutableObjectIterator<BinaryRow> sorted = sorter.sort();
+ BinaryRow binaryRow = new BinaryRow(sorter.arity());
+ while ((binaryRow = sorted.next(binaryRow)) != null) {
+ InternalRow rowRemovedKey = sorter.removeSortKey(binaryRow);
+ result.add(String.format("%s,%s", rowRemovedKey.getInt(0),
rowRemovedKey.getInt(1)));
+ }
+ verify(curve, result);
+ }
+
+ private void verify(String curve, List<String> result) {
+ switch (curve) {
+ case "hilbert":
+ assertThat(result)
+ .containsExactly(
+ "0,0", "0,1", "1,1", "1,0", "2,0", "2,1",
"2,2", "1,2", "0,2");
+ break;
+ case "zorder":
+ assertThat(result)
+ .containsExactly(
+ "0,0", "0,1", "1,0", "1,1", "0,2", "1,2",
"2,0", "2,1", "2,2");
+ break;
+ case "order":
+ assertThat(result)
+ .containsExactly(
+ "0,0", "0,1", "0,2", "1,0", "1,1", "1,2",
"2,0", "2,1", "2,2");
+ break;
+ }
+ }
+
+ protected FileStoreTable createTable(Map<String, String> customOptions,
String clusterCurve)
+ throws Exception {
+ Map<String, String> options = new HashMap<>();
+ options.put(CoreOptions.CLUSTERING_COLUMNS.key(), "f0,f1");
+ options.put(CoreOptions.CLUSTERING_STRATEGY.key(), clusterCurve);
+ options.putAll(customOptions);
+
+ Schema schema =
+ new Schema(
+ RowType.of(DataTypes.INT(), DataTypes.INT(),
DataTypes.STRING())
+ .getFields(),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ options,
+ "");
+
+ SchemaManager schemaManager =
+ new SchemaManager(LocalFileIO.create(), new
Path(tableTempDir.toString()));
+ return FileStoreTableFactory.create(
+ LocalFileIO.create(),
+ new Path(tableTempDir.toString()),
+ schemaManager.createTable(schema));
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
index b807af5c86..df7fd0227e 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
@@ -164,7 +164,11 @@ public class CompactAction extends TableActionBase {
StreamExecutionEnvironment env, FileStoreTable table, boolean
isStreaming)
throws Exception {
if (fullCompaction == null) {
- fullCompaction = !isStreaming;
+ if (table.coreOptions().bucketClusterEnabled()) {
+ fullCompaction = false;
+ } else {
+ fullCompaction = !isStreaming;
+ }
} else {
checkArgument(
!(fullCompaction && isStreaming),
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java
index 6c80070528..1538f8c083 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java
@@ -683,6 +683,171 @@ public class IncrementalClusterActionITCase extends
ActionITCaseBase {
assertThat(splits.get(0).deletionFiles().get().get(0)).isNull();
}
+ @Test
+ public void testClusterWithBucket() throws Exception {
+ Map<String, String> dynamicOptions = commonOptions();
+ dynamicOptions.put(CoreOptions.BUCKET.key(), "2");
+ dynamicOptions.put(CoreOptions.BUCKET_KEY.key(), "pt");
+ dynamicOptions.put(CoreOptions.BUCKET_APPEND_ORDERED.key(), "false");
+ FileStoreTable table = createTable(null, dynamicOptions);
+
+ BinaryString randomStr = BinaryString.fromString(randomString(150));
+ List<CommitMessage> messages = new ArrayList<>();
+
+ // first write
+ for (int pt = 0; pt < 2; pt++) {
+ for (int i = 0; i < 3; i++) {
+ for (int j = 0; j < 3; j++) {
+ messages.addAll(write(GenericRow.of(i, j, randomStr, pt)));
+ }
+ }
+ }
+ commit(messages);
+ ReadBuilder readBuilder = table.newReadBuilder().withProjection(new
int[] {0, 1, 3});
+ List<String> result1 =
+ getResult(
+ readBuilder.newRead(),
+ readBuilder.newScan().plan().splits(),
+ readBuilder.readType());
+ List<String> expected1 = new ArrayList<>();
+ for (int pt = 0; pt <= 1; pt++) {
+ expected1.add(String.format("+I[0, 0, %s]", pt));
+ expected1.add(String.format("+I[0, 1, %s]", pt));
+ expected1.add(String.format("+I[0, 2, %s]", pt));
+ expected1.add(String.format("+I[1, 0, %s]", pt));
+ expected1.add(String.format("+I[1, 1, %s]", pt));
+ expected1.add(String.format("+I[1, 2, %s]", pt));
+ expected1.add(String.format("+I[2, 0, %s]", pt));
+ expected1.add(String.format("+I[2, 1, %s]", pt));
+ expected1.add(String.format("+I[2, 2, %s]", pt));
+ }
+ assertThat(result1).containsExactlyElementsOf(expected1);
+
+ // first cluster
+ runAction(Collections.emptyList());
+ checkSnapshot(table);
+ List<Split> splits = readBuilder.newScan().plan().splits();
+ assertThat(splits.size()).isEqualTo(2);
+ assertThat(((DataSplit)
splits.get(0)).dataFiles().size()).isEqualTo(1);
+ assertThat(((DataSplit)
splits.get(0)).dataFiles().get(0).level()).isEqualTo(5);
+ List<String> result2 = getResult(readBuilder.newRead(), splits,
readBuilder.readType());
+ List<String> expected2 = new ArrayList<>();
+ for (int pt = 1; pt >= 0; pt--) {
+ expected2.add(String.format("+I[0, 0, %s]", pt));
+ expected2.add(String.format("+I[0, 1, %s]", pt));
+ expected2.add(String.format("+I[1, 0, %s]", pt));
+ expected2.add(String.format("+I[1, 1, %s]", pt));
+ expected2.add(String.format("+I[0, 2, %s]", pt));
+ expected2.add(String.format("+I[1, 2, %s]", pt));
+ expected2.add(String.format("+I[2, 0, %s]", pt));
+ expected2.add(String.format("+I[2, 1, %s]", pt));
+ expected2.add(String.format("+I[2, 2, %s]", pt));
+ }
+ assertThat(result2).containsExactlyElementsOf(expected2);
+
+ // second write
+ messages.clear();
+ for (int pt = 0; pt <= 1; pt++) {
+ messages.addAll(
+ write(
+ GenericRow.of(0, 3, null, pt),
+ GenericRow.of(1, 3, null, pt),
+ GenericRow.of(2, 3, null, pt)));
+ messages.addAll(
+ write(
+ GenericRow.of(3, 0, null, pt),
+ GenericRow.of(3, 1, null, pt),
+ GenericRow.of(3, 2, null, pt),
+ GenericRow.of(3, 3, null, pt)));
+ }
+ commit(messages);
+
+ List<String> result3 =
+ getResult(
+ readBuilder.newRead(),
+ readBuilder.newScan().plan().splits(),
+ readBuilder.readType());
+ List<String> expected3 = new ArrayList<>();
+ for (int pt = 1; pt >= 0; pt--) {
+ expected3.add(String.format("+I[0, 0, %s]", pt));
+ expected3.add(String.format("+I[0, 1, %s]", pt));
+ expected3.add(String.format("+I[1, 0, %s]", pt));
+ expected3.add(String.format("+I[1, 1, %s]", pt));
+ expected3.add(String.format("+I[0, 2, %s]", pt));
+ expected3.add(String.format("+I[1, 2, %s]", pt));
+ expected3.add(String.format("+I[2, 0, %s]", pt));
+ expected3.add(String.format("+I[2, 1, %s]", pt));
+ expected3.add(String.format("+I[2, 2, %s]", pt));
+ expected3.add(String.format("+I[0, 3, %s]", pt));
+ expected3.add(String.format("+I[1, 3, %s]", pt));
+ expected3.add(String.format("+I[2, 3, %s]", pt));
+ expected3.add(String.format("+I[3, 0, %s]", pt));
+ expected3.add(String.format("+I[3, 1, %s]", pt));
+ expected3.add(String.format("+I[3, 2, %s]", pt));
+ expected3.add(String.format("+I[3, 3, %s]", pt));
+ }
+ assertThat(result3).containsExactlyElementsOf(expected3);
+
+ // second cluster(incremental)
+ runAction(Collections.emptyList());
+ checkSnapshot(table);
+ splits = readBuilder.newScan().plan().splits();
+ List<String> result4 = getResult(readBuilder.newRead(), splits,
readBuilder.readType());
+ List<String> expected4 = new ArrayList<>();
+ for (int pt = 1; pt >= 0; pt--) {
+ expected4.add(String.format("+I[0, 0, %s]", pt));
+ expected4.add(String.format("+I[0, 1, %s]", pt));
+ expected4.add(String.format("+I[1, 0, %s]", pt));
+ expected4.add(String.format("+I[1, 1, %s]", pt));
+ expected4.add(String.format("+I[0, 2, %s]", pt));
+ expected4.add(String.format("+I[1, 2, %s]", pt));
+ expected4.add(String.format("+I[2, 0, %s]", pt));
+ expected4.add(String.format("+I[2, 1, %s]", pt));
+ expected4.add(String.format("+I[2, 2, %s]", pt));
+ expected4.add(String.format("+I[0, 3, %s]", pt));
+ expected4.add(String.format("+I[1, 3, %s]", pt));
+ expected4.add(String.format("+I[3, 0, %s]", pt));
+ expected4.add(String.format("+I[3, 1, %s]", pt));
+ expected4.add(String.format("+I[2, 3, %s]", pt));
+ expected4.add(String.format("+I[3, 2, %s]", pt));
+ expected4.add(String.format("+I[3, 3, %s]", pt));
+ }
+ assertThat(splits.size()).isEqualTo(2);
+ assertThat(((DataSplit)
splits.get(0)).dataFiles().size()).isEqualTo(2);
+ assertThat(((DataSplit)
splits.get(0)).dataFiles().get(0).level()).isEqualTo(5);
+ assertThat(((DataSplit)
splits.get(0)).dataFiles().get(1).level()).isEqualTo(4);
+ assertThat(result4).containsExactlyElementsOf(expected4);
+
+ // full cluster
+ runAction(Lists.newArrayList("--compact_strategy", "full"));
+ checkSnapshot(table);
+ splits = readBuilder.newScan().plan().splits();
+ List<String> result5 = getResult(readBuilder.newRead(), splits,
readBuilder.readType());
+ List<String> expected5 = new ArrayList<>();
+ for (int pt = 1; pt >= 0; pt--) {
+ expected5.add(String.format("+I[0, 0, %s]", pt));
+ expected5.add(String.format("+I[0, 1, %s]", pt));
+ expected5.add(String.format("+I[1, 0, %s]", pt));
+ expected5.add(String.format("+I[1, 1, %s]", pt));
+ expected5.add(String.format("+I[0, 2, %s]", pt));
+ expected5.add(String.format("+I[0, 3, %s]", pt));
+ expected5.add(String.format("+I[1, 2, %s]", pt));
+ expected5.add(String.format("+I[1, 3, %s]", pt));
+ expected5.add(String.format("+I[2, 0, %s]", pt));
+ expected5.add(String.format("+I[2, 1, %s]", pt));
+ expected5.add(String.format("+I[3, 0, %s]", pt));
+ expected5.add(String.format("+I[3, 1, %s]", pt));
+ expected5.add(String.format("+I[2, 2, %s]", pt));
+ expected5.add(String.format("+I[2, 3, %s]", pt));
+ expected5.add(String.format("+I[3, 2, %s]", pt));
+ expected5.add(String.format("+I[3, 3, %s]", pt));
+ }
+ assertThat(splits.size()).isEqualTo(2);
+ assertThat(((DataSplit)
splits.get(0)).dataFiles().size()).isEqualTo(1);
+ assertThat(((DataSplit)
splits.get(0)).dataFiles().get(0).level()).isEqualTo(5);
+ assertThat(result5).containsExactlyElementsOf(expected5);
+ }
+
protected FileStoreTable createTable(String partitionKeys) throws
Exception {
return createTable(partitionKeys, commonOptions());
}