This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new b874f21cf0 [spark] Fix the column projection when writing by
DataFrameWriter (#5164)
b874f21cf0 is described below
commit b874f21cf087aae241ea93b356982e32c62c7aa7
Author: WenjunMin <[email protected]>
AuthorDate: Thu Feb 27 19:06:10 2025 +0800
[spark] Fix the column projection when writing by DataFrameWriter (#5164)
---
.../paimon/spark/commands/PaimonSparkWriter.scala | 8 +++++++-
.../apache/paimon/spark/sql/DataFrameWriteTest.scala | 20 ++++++++++++++++++++
2 files changed, 27 insertions(+), 1 deletion(-)
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
index 061337b56f..95be75c19a 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
@@ -399,7 +399,13 @@ case class PaimonSparkWriter(table: FileStoreTable) {
}
private def repartitionByPartitionsAndBucket(df: DataFrame): DataFrame = {
- val partitionCols = tableSchema.partitionKeys().asScala.map(col).toSeq
+ val inputSchema = df.schema
+ val partitionCols = tableSchema
+ .partitionKeys()
+ .asScala
+ .map(tableSchema.fieldNames().indexOf(_))
+ .map(x => col(inputSchema.fieldNames(x)))
+ .toSeq
df.repartition(partitionCols ++ Seq(col(BUCKET_COL)): _*)
}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTest.scala
index 77c5180e71..3a0273d728 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTest.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTest.scala
@@ -61,6 +61,26 @@ class DataFrameWriteTest extends PaimonSparkTestBase {
Assertions.assertFalse(paimonTable.options().containsKey("write.merge-schema.explicit-cast"))
}
+ test("Paimon: DataFrameWrite partition table") {
+ withTable("t") {
+ spark.sql(s"""
+ |CREATE TABLE t (a INT, b STRING, dt STRING) PARTITIONED
BY(dt)
+ |TBLPROPERTIES ('file.format' = 'avro', 'bucket' = 2,
'bucket-key' = 'b')
+ |""".stripMargin)
+
+ val table = loadTable("t")
+ val location = table.location().toString
+
+ Seq((1, "x1", "a"), (2, "x2", "b"))
+ .toDF("a", "b", "c")
+ .write
+ .format("paimon")
+ .mode("append")
+ .save(location)
+ checkAnswer(sql("SELECT * FROM t"), Row(1, "x1", "a") :: Row(2, "x2",
"b") :: Nil)
+ }
+ }
+
fileFormats.foreach {
fileFormat =>
test(s"Paimon: DataFrameWrite.saveAsTable in ByName mode, file.format:
$fileFormat") {