This is an automated email from the ASF dual-hosted git repository.

junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 2558c585cc [spark] Spark write supports 'clustering.columns' (#5902)
2558c585cc is described below

commit 2558c585cc0ab7da01a1acf674b7d18e447fca01
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Jul 15 19:01:37 2025 +0800

    [spark] Spark write supports 'clustering.columns' (#5902)
---
 .../shortcodes/generated/core_configuration.html   | 24 +++++++---
 .../generated/flink_connector_configuration.html   | 12 -----
 .../main/java/org/apache/paimon/CoreOptions.java   | 54 ++++++++++++++++++++++
 .../apache/paimon/flink/FlinkConnectorOptions.java | 20 --------
 .../apache/paimon/flink/sink/FlinkSinkBuilder.java | 24 ++--------
 .../paimon/flink/sink/FlinkTableSinkBase.java      |  4 +-
 .../apache/paimon/spark/sort/SparkZOrderUDF.java   | 26 ++++-------
 .../org/apache/paimon/spark/sort/ZorderSorter.java |  1 +
 .../paimon/spark/commands/PaimonSparkWriter.scala  | 23 +++++----
 .../org/apache/paimon/spark/SparkWriteITCase.java  | 14 ++++++
 10 files changed, 119 insertions(+), 83 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index 0db11d7707..ed6ba5da13 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -56,6 +56,12 @@ under the License.
             <td>Integer</td>
             <td>Bucket number for file store.<br />It should either be equal 
to -1 (dynamic bucket mode), -2 (postpone bucket mode), or it must be greater 
than 0 (fixed bucket mode).</td>
         </tr>
+        <tr>
+            <td><h5>bucket-append-ordered</h5></td>
+            <td style="word-wrap: break-word;">true</td>
+            <td>Boolean</td>
+            <td>Whether to ignore the order of the buckets when reading data 
from an append-only table.</td>
+        </tr>
         <tr>
             <td><h5>bucket-function.type</h5></td>
             <td style="word-wrap: break-word;">default</td>
@@ -68,12 +74,6 @@ under the License.
             <td>String</td>
             <td>Specify the paimon distribution policy. Data is assigned to 
each bucket according to the hash value of bucket-key.<br />If you specify 
multiple fields, delimiter is ','.<br />If not specified, the primary key will 
be used; if there is no primary key, the full row will be used.</td>
         </tr>
-        <tr>
-            <td><h5>bucket-append-ordered</h5></td>
-            <td style="word-wrap: break-word;">true</td>
-            <td>Boolean</td>
-            <td>Whether to ignore the order of the buckets when reading data 
from an append-only table.</td>
-        </tr>
         <tr>
             <td><h5>cache-page-size</h5></td>
             <td style="word-wrap: break-word;">64 kb</td>
@@ -140,6 +140,18 @@ under the License.
             <td>Duration</td>
             <td>The maximum time of completed changelog to retain.</td>
         </tr>
+        <tr>
+            <td><h5>clustering.columns</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>String</td>
+            <td>Specifies the column name(s) used for comparison during range 
partitioning, in the format 'columnName1,columnName2'. If not set or set to an 
empty string, it indicates that the range partitioning feature is not enabled. 
This option will be effective only for append table without primary keys and 
batch execution mode.</td>
+        </tr>
+        <tr>
+            <td><h5>clustering.strategy</h5></td>
+            <td style="word-wrap: break-word;">"auto"</td>
+            <td>String</td>
+            <td>Specifies the comparison algorithm used for range 
partitioning, including 'zorder', 'hilbert', and 'order', corresponding to the 
z-order curve algorithm, hilbert curve algorithm, and basic type comparison 
algorithm, respectively. When not configured, it will automatically determine 
the algorithm based on the number of columns in 'sink.clustering.by-columns'. 
'order' is used for 1 column, 'zorder' for less than 5 columns, and 'hilbert' 
for 5 or more columns.</td>
+        </tr>
         <tr>
             <td><h5>commit.callback.#.param</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
diff --git 
a/docs/layouts/shortcodes/generated/flink_connector_configuration.html 
b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
index cbc1f2e83c..12aded973f 100644
--- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
@@ -230,12 +230,6 @@ under the License.
             <td>Duration</td>
             <td>If no records flow in a partition of a stream for that amount 
of time, then that partition is considered "idle" and will not hold back the 
progress of watermarks in downstream operators.</td>
         </tr>
-        <tr>
-            <td><h5>sink.clustering.by-columns</h5></td>
-            <td style="word-wrap: break-word;">(none)</td>
-            <td>String</td>
-            <td>Specifies the column name(s) used for comparison during range 
partitioning, in the format 'columnName1,columnName2'. If not set or set to an 
empty string, it indicates that the range partitioning feature is not enabled. 
This option will be effective only for bucket unaware table without primary 
keys and batch execution mode.</td>
-        </tr>
         <tr>
             <td><h5>sink.clustering.sample-factor</h5></td>
             <td style="word-wrap: break-word;">100</td>
@@ -248,12 +242,6 @@ under the License.
             <td>Boolean</td>
             <td>Indicates whether to further sort data belonged to each sink 
task after range partitioning.</td>
         </tr>
-        <tr>
-            <td><h5>sink.clustering.strategy</h5></td>
-            <td style="word-wrap: break-word;">"auto"</td>
-            <td>String</td>
-            <td>Specifies the comparison algorithm used for range 
partitioning, including 'zorder', 'hilbert', and 'order', corresponding to the 
z-order curve algorithm, hilbert curve algorithm, and basic type comparison 
algorithm, respectively. When not configured, it will automatically determine 
the algorithm based on the number of columns in 'sink.clustering.by-columns'. 
'order' is used for 1 column, 'zorder' for less than 5 columns, and 'hilbert' 
for 5 or more columns.</td>
-        </tr>
         <tr>
             <td><h5>sink.committer-cpu</h5></td>
             <td style="word-wrap: break-word;">1.0</td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index 964116dbe7..2d57a7ca48 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -57,6 +57,9 @@ import java.util.UUID;
 import java.util.stream.Collectors;
 
 import static org.apache.paimon.CoreOptions.MergeEngine.FIRST_ROW;
+import static org.apache.paimon.CoreOptions.OrderType.HILBERT;
+import static org.apache.paimon.CoreOptions.OrderType.ORDER;
+import static org.apache.paimon.CoreOptions.OrderType.ZORDER;
 import static org.apache.paimon.options.ConfigOptions.key;
 import static org.apache.paimon.options.MemorySize.VALUE_128_MB;
 import static org.apache.paimon.options.MemorySize.VALUE_256_MB;
@@ -1835,6 +1838,28 @@ public class CoreOptions implements Serializable {
                                     + "starting from the snapshot after this 
one. If found, commit will be aborted. "
                                     + "If the value of this option is -1, 
committer will not check for its first commit.");
 
+    public static final ConfigOption<String> CLUSTERING_COLUMNS =
+            key("clustering.columns")
+                    .stringType()
+                    .noDefaultValue()
+                    .withFallbackKeys("sink.clustering.by-columns")
+                    .withDescription(
+                            "Specifies the column name(s) used for comparison 
during range partitioning, in the format 'columnName1,columnName2'. "
+                                    + "If not set or set to an empty string, 
it indicates that the range partitioning feature is not enabled. "
+                                    + "This option will be effective only for 
append table without primary keys and batch execution mode.");
+
+    public static final ConfigOption<String> CLUSTERING_STRATEGY =
+            key("clustering.strategy")
+                    .stringType()
+                    .defaultValue("auto")
+                    .withFallbackKeys("sink.clustering.strategy")
+                    .withDescription(
+                            "Specifies the comparison algorithm used for range 
partitioning, including 'zorder', 'hilbert', and 'order', "
+                                    + "corresponding to the z-order curve 
algorithm, hilbert curve algorithm, and basic type comparison algorithm, "
+                                    + "respectively. When not configured, it 
will automatically determine the algorithm based on the number of columns "
+                                    + "in 'sink.clustering.by-columns'. 
'order' is used for 1 column, 'zorder' for less than 5 columns, "
+                                    + "and 'hilbert' for 5 or more columns.");
+
     private final Options options;
 
     public CoreOptions(Map<String, String> options) {
@@ -2803,6 +2828,35 @@ public class CoreOptions implements Serializable {
         return options.getOptional(COMMIT_STRICT_MODE_LAST_SAFE_SNAPSHOT);
     }
 
+    public List<String> clusteringColumns() {
+        return clusteringColumns(options.get(CLUSTERING_COLUMNS));
+    }
+
+    public OrderType clusteringStrategy(int columnSize) {
+        return clusteringStrategy(options.get(CLUSTERING_STRATEGY), 
columnSize);
+    }
+
+    public static List<String> clusteringColumns(String clusteringColumns) {
+        if (clusteringColumns == null || clusteringColumns.isEmpty()) {
+            return Collections.emptyList();
+        }
+        return Arrays.asList(clusteringColumns.split(","));
+    }
+
+    public static OrderType clusteringStrategy(String clusteringStrategy, int 
columnSize) {
+        if (clusteringStrategy.equals(CLUSTERING_STRATEGY.defaultValue())) {
+            if (columnSize == 1) {
+                return ORDER;
+            } else if (columnSize < 5) {
+                return ZORDER;
+            } else {
+                return HILBERT;
+            }
+        } else {
+            return OrderType.of(clusteringStrategy);
+        }
+    }
+
     /** Specifies the merge engine for table with primary key. */
     public enum MergeEngine implements DescribedEnum {
         DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."),
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
index 831a7ac4ef..08eb0f9e2e 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkConnectorOptions.java
@@ -404,26 +404,6 @@ public class FlinkConnectorOptions {
                     .withDescription(
                             "Whether trigger partition mark done when recover 
from state.");
 
-    public static final ConfigOption<String> CLUSTERING_COLUMNS =
-            key("sink.clustering.by-columns")
-                    .stringType()
-                    .noDefaultValue()
-                    .withDescription(
-                            "Specifies the column name(s) used for comparison 
during range partitioning, in the format 'columnName1,columnName2'. "
-                                    + "If not set or set to an empty string, 
it indicates that the range partitioning feature is not enabled. "
-                                    + "This option will be effective only for 
bucket unaware table without primary keys and batch execution mode.");
-
-    public static final ConfigOption<String> CLUSTERING_STRATEGY =
-            key("sink.clustering.strategy")
-                    .stringType()
-                    .defaultValue("auto")
-                    .withDescription(
-                            "Specifies the comparison algorithm used for range 
partitioning, including 'zorder', 'hilbert', and 'order', "
-                                    + "corresponding to the z-order curve 
algorithm, hilbert curve algorithm, and basic type comparison algorithm, "
-                                    + "respectively. When not configured, it 
will automatically determine the algorithm based on the number of columns "
-                                    + "in 'sink.clustering.by-columns'. 
'order' is used for 1 column, 'zorder' for less than 5 columns, "
-                                    + "and 'hilbert' for 5 or more columns.");
-
     public static final ConfigOption<Boolean> CLUSTERING_SORT_IN_CLUSTER =
             key("sink.clustering.sort-in-cluster")
                     .booleanType()
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
index c8d635422f..847cf06990 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSinkBuilder.java
@@ -18,7 +18,7 @@
 
 package org.apache.paimon.flink.sink;
 
-import org.apache.paimon.CoreOptions.OrderType;
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.CoreOptions.PartitionSinkStrategy;
 import org.apache.paimon.annotation.Public;
 import org.apache.paimon.data.InternalRow;
@@ -48,17 +48,13 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.Nullable;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 
-import static org.apache.paimon.CoreOptions.OrderType.HILBERT;
-import static org.apache.paimon.CoreOptions.OrderType.ORDER;
-import static org.apache.paimon.CoreOptions.OrderType.ZORDER;
+import static org.apache.paimon.CoreOptions.clusteringStrategy;
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.CLUSTERING_SAMPLE_FACTOR;
-import static 
org.apache.paimon.flink.FlinkConnectorOptions.CLUSTERING_STRATEGY;
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.MIN_CLUSTERING_SAMPLE_FACTOR;
 import static org.apache.paimon.flink.sink.FlinkSink.isStreaming;
 import static org.apache.paimon.flink.sink.FlinkStreamPartitioner.partition;
@@ -146,7 +142,8 @@ public class FlinkSinkBuilder {
             int sampleFactor) {
         // The clustering will be skipped if the clustering columns are empty 
or the execution
         // mode is STREAMING or the table type is illegal.
-        if (clusteringColumns == null || clusteringColumns.isEmpty()) {
+        List<String> columns = 
CoreOptions.clusteringColumns(clusteringColumns);
+        if (columns.isEmpty()) {
             return this;
         }
         checkState(input != null, "The input stream should be specified 
earlier.");
@@ -159,7 +156,6 @@ public class FlinkSinkBuilder {
         }
         // If the clustering is not skipped, check the clustering column names 
and sample
         // factor value.
-        List<String> columns = Arrays.asList(clusteringColumns.split(","));
         List<String> fieldNames = table.schema().fieldNames();
         checkState(
                 new HashSet<>(fieldNames).containsAll(new HashSet<>(columns)),
@@ -174,17 +170,7 @@ public class FlinkSinkBuilder {
                         + MIN_CLUSTERING_SAMPLE_FACTOR
                         + ".");
         TableSortInfo.Builder sortInfoBuilder = new TableSortInfo.Builder();
-        if (clusteringStrategy.equals(CLUSTERING_STRATEGY.defaultValue())) {
-            if (columns.size() == 1) {
-                sortInfoBuilder.setSortStrategy(ORDER);
-            } else if (columns.size() < 5) {
-                sortInfoBuilder.setSortStrategy(ZORDER);
-            } else {
-                sortInfoBuilder.setSortStrategy(HILBERT);
-            }
-        } else {
-            sortInfoBuilder.setSortStrategy(OrderType.of(clusteringStrategy));
-        }
+        sortInfoBuilder.setSortStrategy(clusteringStrategy(clusteringStrategy, 
columns.size()));
         int upstreamParallelism = input.getParallelism();
         String sinkParallelismValue =
                 
table.options().get(FlinkConnectorOptions.SINK_PARALLELISM.key());
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java
index e719b9a77a..bf2bee15dc 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java
@@ -43,12 +43,12 @@ import java.util.HashMap;
 import java.util.Map;
 
 import static org.apache.paimon.CoreOptions.CHANGELOG_PRODUCER;
+import static org.apache.paimon.CoreOptions.CLUSTERING_COLUMNS;
+import static org.apache.paimon.CoreOptions.CLUSTERING_STRATEGY;
 import static org.apache.paimon.CoreOptions.LOG_CHANGELOG_MODE;
 import static org.apache.paimon.CoreOptions.MERGE_ENGINE;
-import static org.apache.paimon.flink.FlinkConnectorOptions.CLUSTERING_COLUMNS;
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.CLUSTERING_SAMPLE_FACTOR;
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.CLUSTERING_SORT_IN_CLUSTER;
-import static 
org.apache.paimon.flink.FlinkConnectorOptions.CLUSTERING_STRATEGY;
 import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_PARALLELISM;
 
 /** Table sink to create sink. */
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/sort/SparkZOrderUDF.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/sort/SparkZOrderUDF.java
index 45858fe62a..9cd0c080d0 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/sort/SparkZOrderUDF.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/sort/SparkZOrderUDF.java
@@ -37,8 +37,6 @@ import org.apache.spark.sql.types.ShortType;
 import org.apache.spark.sql.types.StringType;
 import org.apache.spark.sql.types.TimestampType;
 
-import java.io.IOException;
-import java.io.ObjectInputStream;
 import java.io.Serializable;
 import java.nio.ByteBuffer;
 
@@ -51,13 +49,8 @@ import scala.collection.Seq;
 
 /** Spark udf to calculate zorder bytes. Copied from iceberg. */
 public class SparkZOrderUDF implements Serializable {
-    private static final byte[] PRIMITIVE_EMPTY = new 
byte[ZOrderByteUtils.PRIMITIVE_BUFFER_SIZE];
 
-    /**
-     * Every Spark task runs iteratively on a rows in a single thread so 
ThreadLocal should protect
-     * from concurrent access to any of these structures.
-     */
-    private transient ThreadLocal<ByteBuffer> outputBuffer;
+    private static final byte[] PRIMITIVE_EMPTY = new 
byte[ZOrderByteUtils.PRIMITIVE_BUFFER_SIZE];
 
     private transient ThreadLocal<byte[][]> inputHolder;
     private transient ThreadLocal<ByteBuffer[]> inputBuffers;
@@ -75,14 +68,10 @@ public class SparkZOrderUDF implements Serializable {
         this.maxOutputSize = maxOutputSize;
     }
 
-    private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
-        in.defaultReadObject();
-        inputBuffers = ThreadLocal.withInitial(() -> new ByteBuffer[numCols]);
-        inputHolder = ThreadLocal.withInitial(() -> new byte[numCols][]);
-        outputBuffer = ThreadLocal.withInitial(() -> 
ByteBuffer.allocate(totalOutputBytes));
-    }
-
     private ByteBuffer inputBuffer(int position, int size) {
+        if (inputBuffers == null) {
+            inputBuffers = ThreadLocal.withInitial(() -> new 
ByteBuffer[numCols]);
+        }
         ByteBuffer buffer = inputBuffers.get()[position];
         if (buffer == null) {
             buffer = ByteBuffer.allocate(size);
@@ -92,9 +81,14 @@ public class SparkZOrderUDF implements Serializable {
     }
 
     byte[] interleaveBits(Seq<byte[]> scalaBinary) {
+        if (inputHolder == null) {
+            inputHolder = ThreadLocal.withInitial(() -> new byte[numCols][]);
+        }
+
         byte[][] columnsBinary =
                 
JavaConverters.seqAsJavaList(scalaBinary).toArray(inputHolder.get());
-        return ZOrderByteUtils.interleaveBits(columnsBinary, totalOutputBytes, 
outputBuffer.get());
+        return ZOrderByteUtils.interleaveBits(
+                columnsBinary, totalOutputBytes, 
ByteBuffer.allocate(totalOutputBytes));
     }
 
     private UserDefinedFunction tinyToOrderedBytesUDF() {
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/sort/ZorderSorter.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/sort/ZorderSorter.java
index f3f8a50735..2a6d1b2ceb 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/sort/ZorderSorter.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/sort/ZorderSorter.java
@@ -38,6 +38,7 @@ public class ZorderSorter extends TableSorter {
         checkNotEmpty();
     }
 
+    @Override
     public Dataset<Row> sort(Dataset<Row> df) {
         Column zColumn = zValue(df);
         Dataset<Row> zValueDF = df.withColumn(Z_COLUMN, zColumn);
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
index d3ea09eb4e..b8b6650abc 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
@@ -31,6 +31,7 @@ import org.apache.paimon.manifest.FileKind
 import org.apache.paimon.spark.{SparkRow, SparkTableWrite, SparkTypeUtils}
 import org.apache.paimon.spark.catalog.functions.BucketFunction
 import org.apache.paimon.spark.schema.SparkSystemColumns.{BUCKET_COL, 
ROW_KIND_COL}
+import org.apache.paimon.spark.sort.TableSorter
 import org.apache.paimon.spark.util.OptionUtils.paimonExtensionEnabled
 import org.apache.paimon.spark.util.SparkRowUtils
 import org.apache.paimon.spark.write.WriteHelper
@@ -233,15 +234,21 @@ case class PaimonSparkWriter(table: FileStoreTable) 
extends WriteHelper {
         }
 
       case BUCKET_UNAWARE | POSTPONE_MODE =>
-        if (
-          
coreOptions.partitionSinkStrategy().equals(PartitionSinkStrategy.HASH) && 
!tableSchema
-            .partitionKeys()
-            .isEmpty
-        ) {
-          writeWithoutBucket(data.repartition(partitionCols(data): _*))
-        } else {
-          writeWithoutBucket(data)
+        var input = data
+        if (tableSchema.partitionKeys().size() > 0) {
+          coreOptions.partitionSinkStrategy match {
+            case PartitionSinkStrategy.HASH =>
+              input = data.repartition(partitionCols(data): _*)
+            case _ =>
+          }
+        }
+        val clusteringColumns = coreOptions.clusteringColumns()
+        if (!clusteringColumns.isEmpty) {
+          val strategy = 
coreOptions.clusteringStrategy(tableSchema.fields().size())
+          val sorter = TableSorter.getSorter(table, strategy, 
clusteringColumns)
+          input = sorter.sort(data)
         }
+        writeWithoutBucket(input)
 
       case HASH_FIXED =>
         if (paimonExtensionEnabled && BucketFunction.supportsTable(table)) {
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java
 
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java
index 6b1b797b24..b0737e208f 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java
+++ 
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java
@@ -35,6 +35,8 @@ import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInstance;
 import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.IOException;
 import java.util.Arrays;
@@ -124,6 +126,18 @@ public class SparkWriteITCase {
                         "[[1,2,my_value], [2,2,my_value], [3,2,my_value], 
[4,2,my_value], [5,3,my_value]]");
     }
 
+    @ParameterizedTest
+    @ValueSource(strings = {"order", "zorder", "hilbert"})
+    public void testWriteWithClustering(String clusterStrategy) {
+        spark.sql(
+                "CREATE TABLE T (a INT, b INT) TBLPROPERTIES ("
+                        + "'clustering.columns'='a,b',"
+                        + String.format("'clustering.strategy'='%s')", 
clusterStrategy));
+        spark.sql("INSERT INTO T VALUES (2, 2), (1, 1), (3, 
3)").collectAsList();
+        List<Row> rows = spark.sql("SELECT * FROM T").collectAsList();
+        assertThat(rows.toString()).isEqualTo("[[1,1], [2,2], [3,3]]");
+    }
+
     @Test
     public void testWrite() {
         spark.sql(

Reply via email to