This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 4f9f28e4c1 [spark] Fix write multiple cols with key dynamic table
(#6585)
4f9f28e4c1 is described below
commit 4f9f28e4c1ecee3fc4abe9fcf5f24bfe150a88f4
Author: Zouxxyy <[email protected]>
AuthorDate: Tue Nov 11 22:02:03 2025 +0800
[spark] Fix write multiple cols with key dynamic table (#6585)
---
.../paimon/spark/commands/PaimonSparkWriter.scala | 11 ++++-------
.../paimon/spark/sql/DynamicBucketTableTest.scala | 21 +++++++++++++++++++++
2 files changed, 25 insertions(+), 7 deletions(-)
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
index da9a53d75b..515464ef14 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
@@ -207,11 +207,8 @@ case class PaimonSparkWriter(
val assignerParallelism =
Option(coreOptions.dynamicBucketAssignerParallelism)
.map(_.toInt)
.getOrElse(sparkParallelism)
- val bootstrapped = bootstrapAndRepartitionByKeyHash(
- withInitBucketCol,
- assignerParallelism,
- rowKindColIdx,
- rowType)
+ val bootstrapped =
+ bootstrapAndRepartitionByKeyHash(withInitBucketCol,
assignerParallelism, rowKindColIdx)
val globalDynamicBucketProcessor =
GlobalDynamicBucketProcessor(
@@ -430,11 +427,11 @@ case class PaimonSparkWriter(
private def bootstrapAndRepartitionByKeyHash(
data: DataFrame,
parallelism: Int,
- rowKindColIdx: Int,
- rowType: RowType): RDD[(KeyPartOrRow, Array[Byte])] = {
+ rowKindColIdx: Int): RDD[(KeyPartOrRow, Array[Byte])] = {
val numSparkPartitions = data.rdd.getNumPartitions
val primaryKeys = table.schema().primaryKeys()
val bootstrapType = IndexBootstrap.bootstrapType(table.schema())
+ val rowType = table.rowType()
data.rdd
.mapPartitions {
iter =>
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DynamicBucketTableTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DynamicBucketTableTest.scala
index f005156182..fee7973aca 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DynamicBucketTableTest.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DynamicBucketTableTest.scala
@@ -200,4 +200,25 @@ class DynamicBucketTableTest extends PaimonSparkTestBase {
checkAnswer(sql("SELECT DISTINCT bucket FROM `T$FILES`"), Seq(Row(0),
Row(1), Row(2)))
}
+
+ test("Paimon cross partition table: write multiple cols") {
+ withTable("t") {
+ val columns = (0 until 118).map(i => s"c$i STRING").mkString(", ")
+ sql(s"""
+ |CREATE TABLE t (
+ | $columns,
+ | pt STRING
+ |)
+ |PARTITIONED BY (pt)
+ |TBLPROPERTIES (
+ | 'primary-key' = 'c0');
+ |""".stripMargin)
+
+ sql("INSERT INTO t (c0, c1, c2, pt) VALUES ('c0','c1','c2','pt')")
+ checkAnswer(
+ sql("SELECT c0, c1, c2, pt from t"),
+ Row("c0", "c1", "c2", "pt")
+ )
+ }
+ }
}