This is an automated email from the ASF dual-hosted git repository.
zouxxyy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 7f01330cc4 [Spark] Allows data with missing columns to be written when
enable 'merge-schema' (#5059)
7f01330cc4 is described below
commit 7f01330cc4b3b554ab37e2b8742cf13eaead9ba5
Author: Yann Byron <[email protected]>
AuthorDate: Wed Feb 12 17:50:15 2025 +0800
[Spark] Allows data with missing columns to be written when enable
'merge-schema' (#5059)
---
.../spark/commands/WriteIntoPaimonTable.scala | 20 +++++++++-
.../scala/org/apache/spark/sql/PaimonUtils.scala | 6 ++-
.../paimon/spark/sql/DataFrameWriteTest.scala | 46 ++++++++++++++++++++++
3 files changed, 69 insertions(+), 3 deletions(-)
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
index 80dd6ae425..eae8b4f146 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
@@ -29,9 +29,10 @@ import org.apache.paimon.table.sink.CommitMessage
import org.apache.paimon.utils.{InternalRowPartitionComputer,
PartitionPathUtils, TypeUtils}
import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{DataFrame, Row, SparkSession}
+import org.apache.spark.sql.{DataFrame, PaimonUtils, Row, SparkSession}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.command.RunnableCommand
+import org.apache.spark.sql.functions.{col, lit}
import scala.collection.JavaConverters._
@@ -39,7 +40,7 @@ import scala.collection.JavaConverters._
case class WriteIntoPaimonTable(
override val originTable: FileStoreTable,
saveMode: SaveMode,
- data: DataFrame,
+ _data: DataFrame,
options: Options)
extends RunnableCommand
with PaimonCommand
@@ -49,10 +50,25 @@ case class WriteIntoPaimonTable(
private lazy val mergeSchema =
options.get(SparkConnectorOptions.MERGE_SCHEMA)
override def run(sparkSession: SparkSession): Seq[Row] = {
+ var data = _data
if (mergeSchema) {
val dataSchema = SparkSystemColumns.filterSparkSystemColumns(data.schema)
val allowExplicitCast = options.get(SparkConnectorOptions.EXPLICIT_CAST)
mergeAndCommitSchema(dataSchema, allowExplicitCast)
+
+ // For case that some columns is absent in data, we still allow to write
once write.merge-schema is true.
+ val newTableSchema =
SparkTypeUtils.fromPaimonRowType(table.schema().logicalRowType())
+ if (!PaimonUtils.sameType(newTableSchema, dataSchema)) {
+ val resolve = sparkSession.sessionState.conf.resolver
+ val cols = newTableSchema.map {
+ field =>
+ dataSchema.find(f => resolve(f.name, field.name)) match {
+ case Some(f) => col(f.name)
+ case _ => lit(null).as(field.name)
+ }
+ }
+ data = data.select(cols: _*)
+ }
}
val (dynamicPartitionOverwriteMode, overwritePartition) = parseSaveMode()
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonUtils.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonUtils.scala
index a1ce251374..9023bfa646 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonUtils.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonUtils.scala
@@ -30,7 +30,7 @@ import
org.apache.spark.sql.execution.datasources.DataSourceStrategy
import
org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy.translateFilterV2WithMapping
import org.apache.spark.sql.internal.connector.PredicateUtils
import org.apache.spark.sql.sources.Filter
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.sql.util.PartitioningUtils
import org.apache.spark.util.{Utils => SparkUtils}
@@ -121,4 +121,8 @@ object PaimonUtils {
partitionColumnNames: Seq[String]): Unit = {
PartitioningUtils.requireExactMatchedPartitionSpec(tableName, spec,
partitionColumnNames)
}
+
+ def sameType(left: DataType, right: DataType): Boolean = {
+ left.sameType(right)
+ }
}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTest.scala
index edd092c85c..77c5180e71 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTest.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTest.scala
@@ -506,4 +506,50 @@ class DataFrameWriteTest extends PaimonSparkTestBase {
}
}
}
+
+ test("Paimon Schema Evolution: some columns is absent in the coming data") {
+
+ spark.sql(s"""
+ |CREATE TABLE T (a INT, b STRING)
+ |""".stripMargin)
+
+ val paimonTable = loadTable("T")
+ val location = paimonTable.location().toString
+
+ val df1 = Seq((1, "2023-08-01"), (2, "2023-08-02")).toDF("a", "b")
+ df1.write.format("paimon").mode("append").save(location)
+ checkAnswer(
+ spark.sql("SELECT * FROM T ORDER BY a, b"),
+ Row(1, "2023-08-01") :: Row(2, "2023-08-02") :: Nil)
+
+ // Case 1: two additional fields: DoubleType and TimestampType
+ val ts = java.sql.Timestamp.valueOf("2023-08-01 10:00:00.0")
+ val df2 = Seq((1, "2023-08-01", 12.3d, ts), (3, "2023-08-03", 34.5d, ts))
+ .toDF("a", "b", "c", "d")
+ df2.write
+ .format("paimon")
+ .mode("append")
+ .option("write.merge-schema", "true")
+ .save(location)
+
+ // Case 2: colum b and d are absent in the coming data
+ val df3 = Seq((4, 45.6d), (5, 56.7d))
+ .toDF("a", "c")
+ df3.write
+ .format("paimon")
+ .mode("append")
+ .option("write.merge-schema", "true")
+ .save(location)
+ val expected3 =
+ Row(1, "2023-08-01", null, null) :: Row(1, "2023-08-01", 12.3d, ts) ::
Row(
+ 2,
+ "2023-08-02",
+ null,
+ null) :: Row(3, "2023-08-03", 34.5d, ts) :: Row(4, null, 45.6d, null)
:: Row(
+ 5,
+ null,
+ 56.7d,
+ null) :: Nil
+ checkAnswer(spark.sql("SELECT * FROM T ORDER BY a, b"), expected3)
+ }
}