This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new b874f21cf0 [spark] Fix the column projection when writing by 
DataFrameWriter (#5164)
b874f21cf0 is described below

commit b874f21cf087aae241ea93b356982e32c62c7aa7
Author: WenjunMin <[email protected]>
AuthorDate: Thu Feb 27 19:06:10 2025 +0800

    [spark] Fix the column projection when writing by DataFrameWriter (#5164)
---
 .../paimon/spark/commands/PaimonSparkWriter.scala    |  8 +++++++-
 .../apache/paimon/spark/sql/DataFrameWriteTest.scala | 20 ++++++++++++++++++++
 2 files changed, 27 insertions(+), 1 deletion(-)

diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
index 061337b56f..95be75c19a 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
@@ -399,7 +399,13 @@ case class PaimonSparkWriter(table: FileStoreTable) {
   }
 
   private def repartitionByPartitionsAndBucket(df: DataFrame): DataFrame = {
-    val partitionCols = tableSchema.partitionKeys().asScala.map(col).toSeq
+    val inputSchema = df.schema
+    val partitionCols = tableSchema
+      .partitionKeys()
+      .asScala
+      .map(tableSchema.fieldNames().indexOf(_))
+      .map(x => col(inputSchema.fieldNames(x)))
+      .toSeq
     df.repartition(partitionCols ++ Seq(col(BUCKET_COL)): _*)
   }
 
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTest.scala
index 77c5180e71..3a0273d728 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTest.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTest.scala
@@ -61,6 +61,26 @@ class DataFrameWriteTest extends PaimonSparkTestBase {
     
Assertions.assertFalse(paimonTable.options().containsKey("write.merge-schema.explicit-cast"))
   }
 
+  test("Paimon: DataFrameWrite partition table") {
+    withTable("t") {
+      spark.sql(s"""
+                   |CREATE TABLE t (a INT, b STRING, dt STRING) PARTITIONED 
BY(dt)
+                   |TBLPROPERTIES ('file.format' = 'avro', 'bucket' = 2, 
'bucket-key' = 'b')
+                   |""".stripMargin)
+
+      val table = loadTable("t")
+      val location = table.location().toString
+
+      Seq((1, "x1", "a"), (2, "x2", "b"))
+        .toDF("a", "b", "c")
+        .write
+        .format("paimon")
+        .mode("append")
+        .save(location)
+      checkAnswer(sql("SELECT * FROM t"), Row(1, "x1", "a") :: Row(2, "x2", 
"b") :: Nil)
+    }
+  }
+
   fileFormats.foreach {
     fileFormat =>
       test(s"Paimon: DataFrameWrite.saveAsTable in ByName mode, file.format: 
$fileFormat") {

Reply via email to