This is an automated email from the ASF dual-hosted git repository.

yuzelin pushed a commit to branch release-1.3
in repository https://gitbox.apache.org/repos/asf/paimon.git

commit 2b9eb2096d7f733c875430cae634b5c3cdb9d861
Author: Zouxxyy <[email protected]>
AuthorDate: Tue Nov 11 22:02:03 2025 +0800

    [spark] Fix write multiple cols with key dynamic table (#6585)
    
    (cherry picked from commit 4f9f28e4c1ecee3fc4abe9fcf5f24bfe150a88f4)
---
 .../paimon/spark/commands/PaimonSparkWriter.scala   | 11 ++++-------
 .../paimon/spark/sql/DynamicBucketTableTest.scala   | 21 +++++++++++++++++++++
 2 files changed, 25 insertions(+), 7 deletions(-)

diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
index ee3104e9b3..a130fde527 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
@@ -172,11 +172,8 @@ case class PaimonSparkWriter(table: FileStoreTable, 
writeRowTracking: Boolean =
         val assignerParallelism = 
Option(coreOptions.dynamicBucketAssignerParallelism)
           .map(_.toInt)
           .getOrElse(sparkParallelism)
-        val bootstrapped = bootstrapAndRepartitionByKeyHash(
-          withInitBucketCol,
-          assignerParallelism,
-          rowKindColIdx,
-          rowType)
+        val bootstrapped =
+          bootstrapAndRepartitionByKeyHash(withInitBucketCol, 
assignerParallelism, rowKindColIdx)
 
         val globalDynamicBucketProcessor =
           GlobalDynamicBucketProcessor(
@@ -378,11 +375,11 @@ case class PaimonSparkWriter(table: FileStoreTable, 
writeRowTracking: Boolean =
   private def bootstrapAndRepartitionByKeyHash(
       data: DataFrame,
       parallelism: Int,
-      rowKindColIdx: Int,
-      rowType: RowType): RDD[(KeyPartOrRow, Array[Byte])] = {
+      rowKindColIdx: Int): RDD[(KeyPartOrRow, Array[Byte])] = {
     val numSparkPartitions = data.rdd.getNumPartitions
     val primaryKeys = table.schema().primaryKeys()
     val bootstrapType = IndexBootstrap.bootstrapType(table.schema())
+    val rowType = table.rowType()
     data.rdd
       .mapPartitions {
         iter =>
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DynamicBucketTableTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DynamicBucketTableTest.scala
index f005156182..fee7973aca 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DynamicBucketTableTest.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DynamicBucketTableTest.scala
@@ -200,4 +200,25 @@ class DynamicBucketTableTest extends PaimonSparkTestBase {
 
     checkAnswer(sql("SELECT DISTINCT bucket FROM `T$FILES`"), Seq(Row(0), 
Row(1), Row(2)))
   }
+
+  test("Paimon cross partition table: write multiple cols") {
+    withTable("t") {
+      val columns = (0 until 118).map(i => s"c$i STRING").mkString(", ")
+      sql(s"""
+             |CREATE TABLE t (
+             |  $columns,
+             |  pt STRING
+             |)
+             |PARTITIONED BY (pt)
+             |TBLPROPERTIES (
+             |  'primary-key' = 'c0');
+             |""".stripMargin)
+
+      sql("INSERT INTO t (c0, c1, c2, pt) VALUES ('c0','c1','c2','pt')")
+      checkAnswer(
+        sql("SELECT c0, c1, c2, pt from t"),
+        Row("c0", "c1", "c2", "pt")
+      )
+    }
+  }
 }

Reply via email to