This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 97ffce0374 [spark] Apply partition.sink-strategy to spark write (#5856)
97ffce0374 is described below

commit 97ffce0374ac1212ebcfb86709fe40c44d567cc5
Author: Zouxxyy <[email protected]>
AuthorDate: Wed Jul 9 16:59:51 2025 +0800

    [spark] Apply partition.sink-strategy to spark write (#5856)
---
 .../paimon/spark/commands/PaimonSparkWriter.scala  | 21 ++++++--
 .../spark/write/PaimonWriteRequirement.scala       |  8 ++-
 .../paimon/spark/sql/WriteDistributeModeTest.scala | 57 ++++++++++++++++++++++
 3 files changed, 80 insertions(+), 6 deletions(-)

diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
index e8c72ccff2..d3ea09eb4e 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
@@ -19,7 +19,7 @@
 package org.apache.paimon.spark.commands
 
 import org.apache.paimon.CoreOptions
-import org.apache.paimon.CoreOptions.WRITE_ONLY
+import org.apache.paimon.CoreOptions.{PartitionSinkStrategy, WRITE_ONLY}
 import org.apache.paimon.codegen.CodeGenUtils
 import org.apache.paimon.crosspartition.{IndexBootstrap, KeyPartOrRow}
 import org.apache.paimon.data.serializer.InternalSerializers
@@ -42,7 +42,7 @@ import org.apache.paimon.utils.SerializationUtils
 
 import org.apache.spark.{Partitioner, TaskContext}
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
+import org.apache.spark.sql._
 import org.apache.spark.sql.functions._
 
 import java.io.IOException
@@ -233,7 +233,15 @@ case class PaimonSparkWriter(table: FileStoreTable) 
extends WriteHelper {
         }
 
       case BUCKET_UNAWARE | POSTPONE_MODE =>
-        writeWithoutBucket(data)
+        if (
+          
coreOptions.partitionSinkStrategy().equals(PartitionSinkStrategy.HASH) && 
!tableSchema
+            .partitionKeys()
+            .isEmpty
+        ) {
+          writeWithoutBucket(data.repartition(partitionCols(data): _*))
+        } else {
+          writeWithoutBucket(data)
+        }
 
       case HASH_FIXED =>
         if (paimonExtensionEnabled && BucketFunction.supportsTable(table)) {
@@ -410,14 +418,17 @@ case class PaimonSparkWriter(table: FileStoreTable) 
extends WriteHelper {
   }
 
   private def repartitionByPartitionsAndBucket(df: DataFrame): DataFrame = {
+    df.repartition(partitionCols(df) ++ Seq(col(BUCKET_COL)): _*)
+  }
+
+  def partitionCols(df: DataFrame): Seq[Column] = {
     val inputSchema = df.schema
-    val partitionCols = tableSchema
+    tableSchema
       .partitionKeys()
       .asScala
       .map(tableSchema.fieldNames().indexOf(_))
       .map(x => col(inputSchema.fieldNames(x)))
       .toSeq
-    df.repartition(partitionCols ++ Seq(col(BUCKET_COL)): _*)
   }
 
   private def deserializeCommitMessage(
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonWriteRequirement.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonWriteRequirement.scala
index 097dbf7907..1f95c19146 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonWriteRequirement.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonWriteRequirement.scala
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.spark.write
 
+import org.apache.paimon.CoreOptions.PartitionSinkStrategy
 import org.apache.paimon.spark.commands.BucketExpression.quote
 import org.apache.paimon.table.BucketMode._
 import org.apache.paimon.table.FileStoreTable
@@ -56,7 +57,12 @@ object PaimonWriteRequirement {
     val clusteringExpressions =
       (partitionTransforms ++ 
bucketTransforms).map(identity[Expression]).toArray
 
-    if (clusteringExpressions.isEmpty) {
+    if (
+      clusteringExpressions.isEmpty || (bucketTransforms.isEmpty && table
+        .coreOptions()
+        .partitionSinkStrategy()
+        .equals(PartitionSinkStrategy.NONE))
+    ) {
       EMPTY
     } else {
       val distribution: ClusteredDistribution =
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/WriteDistributeModeTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/WriteDistributeModeTest.scala
new file mode 100644
index 0000000000..23b8602431
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/WriteDistributeModeTest.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+import org.apache.paimon.spark.PaimonSparkTestBase
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.execution.CommandResultExec
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+import org.apache.spark.sql.execution.exchange.ShuffleExchangeLike
+
+class WriteDistributeModeTest extends PaimonSparkTestBase with 
AdaptiveSparkPlanHelper {
+
+  test("Write distribute mode: write partitioned bucket -1 table") {
+    for (distributeMode <- Seq("none", "hash")) {
+      withTable("t") {
+        sql(
+          "CREATE TABLE t (id INT, pt STRING) partitioned by (pt) 
TBLPROPERTIES ('file.format'='avro')")
+        val query = "INSERT INTO t VALUES (1, 'p1'), (2, 'p2')"
+
+        withSparkSQLConf(
+          "spark.paimon.write.use-v2-write" -> "true",
+          "spark.paimon.partition.sink-strategy" -> distributeMode) {
+          val df = spark.sql(query)
+          val shuffleNodes = collect(
+            
df.queryExecution.executedPlan.asInstanceOf[CommandResultExec].commandPhysicalPlan)
 {
+            case shuffle: ShuffleExchangeLike => shuffle
+          }
+
+          if (distributeMode == "none") {
+            assert(shuffleNodes.isEmpty)
+          } else {
+            assert(shuffleNodes.size == 1)
+          }
+
+          checkAnswer(spark.sql("SELECT * FROM t ORDER BY id"), Seq(Row(1, 
"p1"), Row(2, "p2")))
+        }
+      }
+    }
+  }
+}

Reply via email to