This is an automated email from the ASF dual-hosted git repository. yuzelin pushed a commit to branch release-1.3 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit 2b9eb2096d7f733c875430cae634b5c3cdb9d861 Author: Zouxxyy <[email protected]> AuthorDate: Tue Nov 11 22:02:03 2025 +0800 [spark] Fix write multiple cols with key dynamic table (#6585) (cherry picked from commit 4f9f28e4c1ecee3fc4abe9fcf5f24bfe150a88f4) --- .../paimon/spark/commands/PaimonSparkWriter.scala | 11 ++++------- .../paimon/spark/sql/DynamicBucketTableTest.scala | 21 +++++++++++++++++++++ 2 files changed, 25 insertions(+), 7 deletions(-) diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala index ee3104e9b3..a130fde527 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala @@ -172,11 +172,8 @@ case class PaimonSparkWriter(table: FileStoreTable, writeRowTracking: Boolean = val assignerParallelism = Option(coreOptions.dynamicBucketAssignerParallelism) .map(_.toInt) .getOrElse(sparkParallelism) - val bootstrapped = bootstrapAndRepartitionByKeyHash( - withInitBucketCol, - assignerParallelism, - rowKindColIdx, - rowType) + val bootstrapped = + bootstrapAndRepartitionByKeyHash(withInitBucketCol, assignerParallelism, rowKindColIdx) val globalDynamicBucketProcessor = GlobalDynamicBucketProcessor( @@ -378,11 +375,11 @@ case class PaimonSparkWriter(table: FileStoreTable, writeRowTracking: Boolean = private def bootstrapAndRepartitionByKeyHash( data: DataFrame, parallelism: Int, - rowKindColIdx: Int, - rowType: RowType): RDD[(KeyPartOrRow, Array[Byte])] = { + rowKindColIdx: Int): RDD[(KeyPartOrRow, Array[Byte])] = { val numSparkPartitions = data.rdd.getNumPartitions val primaryKeys = table.schema().primaryKeys() val bootstrapType = IndexBootstrap.bootstrapType(table.schema()) + val rowType = table.rowType() data.rdd .mapPartitions { iter => diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DynamicBucketTableTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DynamicBucketTableTest.scala index f005156182..fee7973aca 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DynamicBucketTableTest.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DynamicBucketTableTest.scala @@ -200,4 +200,25 @@ class DynamicBucketTableTest extends PaimonSparkTestBase { checkAnswer(sql("SELECT DISTINCT bucket FROM `T$FILES`"), Seq(Row(0), Row(1), Row(2))) } + + test("Paimon cross partition table: write multiple cols") { + withTable("t") { + val columns = (0 until 118).map(i => s"c$i STRING").mkString(", ") + sql(s""" + |CREATE TABLE t ( + | $columns, + | pt STRING + |) + |PARTITIONED BY (pt) + |TBLPROPERTIES ( + | 'primary-key' = 'c0'); + |""".stripMargin) + + sql("INSERT INTO t (c0, c1, c2, pt) VALUES ('c0','c1','c2','pt')") + checkAnswer( + sql("SELECT c0, c1, c2, pt from t"), + Row("c0", "c1", "c2", "pt") + ) + } + } }
