This is an automated email from the ASF dual-hosted git repository.

zouxxyy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 7f01330cc4 [Spark] Allows data with missing columns to be written when 
enable 'merge-schema' (#5059)
7f01330cc4 is described below

commit 7f01330cc4b3b554ab37e2b8742cf13eaead9ba5
Author: Yann Byron <[email protected]>
AuthorDate: Wed Feb 12 17:50:15 2025 +0800

    [Spark] Allows data with missing columns to be written when enable 
'merge-schema' (#5059)
---
 .../spark/commands/WriteIntoPaimonTable.scala      | 20 +++++++++-
 .../scala/org/apache/spark/sql/PaimonUtils.scala   |  6 ++-
 .../paimon/spark/sql/DataFrameWriteTest.scala      | 46 ++++++++++++++++++++++
 3 files changed, 69 insertions(+), 3 deletions(-)

diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
index 80dd6ae425..eae8b4f146 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
@@ -29,9 +29,10 @@ import org.apache.paimon.table.sink.CommitMessage
 import org.apache.paimon.utils.{InternalRowPartitionComputer, 
PartitionPathUtils, TypeUtils}
 
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{DataFrame, Row, SparkSession}
+import org.apache.spark.sql.{DataFrame, PaimonUtils, Row, SparkSession}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.command.RunnableCommand
+import org.apache.spark.sql.functions.{col, lit}
 
 import scala.collection.JavaConverters._
 
@@ -39,7 +40,7 @@ import scala.collection.JavaConverters._
 case class WriteIntoPaimonTable(
     override val originTable: FileStoreTable,
     saveMode: SaveMode,
-    data: DataFrame,
+    _data: DataFrame,
     options: Options)
   extends RunnableCommand
   with PaimonCommand
@@ -49,10 +50,25 @@ case class WriteIntoPaimonTable(
   private lazy val mergeSchema = 
options.get(SparkConnectorOptions.MERGE_SCHEMA)
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
+    var data = _data
     if (mergeSchema) {
       val dataSchema = SparkSystemColumns.filterSparkSystemColumns(data.schema)
       val allowExplicitCast = options.get(SparkConnectorOptions.EXPLICIT_CAST)
       mergeAndCommitSchema(dataSchema, allowExplicitCast)
+
+      // For case that some columns is absent in data, we still allow to write 
once write.merge-schema is true.
+      val newTableSchema = 
SparkTypeUtils.fromPaimonRowType(table.schema().logicalRowType())
+      if (!PaimonUtils.sameType(newTableSchema, dataSchema)) {
+        val resolve = sparkSession.sessionState.conf.resolver
+        val cols = newTableSchema.map {
+          field =>
+            dataSchema.find(f => resolve(f.name, field.name)) match {
+              case Some(f) => col(f.name)
+              case _ => lit(null).as(field.name)
+            }
+        }
+        data = data.select(cols: _*)
+      }
     }
 
     val (dynamicPartitionOverwriteMode, overwritePartition) = parseSaveMode()
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonUtils.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonUtils.scala
index a1ce251374..9023bfa646 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonUtils.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonUtils.scala
@@ -30,7 +30,7 @@ import 
org.apache.spark.sql.execution.datasources.DataSourceStrategy
 import 
org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy.translateFilterV2WithMapping
 import org.apache.spark.sql.internal.connector.PredicateUtils
 import org.apache.spark.sql.sources.Filter
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.types.{DataType, StructType}
 import org.apache.spark.sql.util.PartitioningUtils
 import org.apache.spark.util.{Utils => SparkUtils}
 
@@ -121,4 +121,8 @@ object PaimonUtils {
       partitionColumnNames: Seq[String]): Unit = {
     PartitioningUtils.requireExactMatchedPartitionSpec(tableName, spec, 
partitionColumnNames)
   }
+
+  def sameType(left: DataType, right: DataType): Boolean = {
+    left.sameType(right)
+  }
 }
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTest.scala
index edd092c85c..77c5180e71 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTest.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTest.scala
@@ -506,4 +506,50 @@ class DataFrameWriteTest extends PaimonSparkTestBase {
         }
       }
   }
+
+  test("Paimon Schema Evolution: some columns is absent in the coming data") {
+
+    spark.sql(s"""
+                 |CREATE TABLE T (a INT, b STRING)
+                 |""".stripMargin)
+
+    val paimonTable = loadTable("T")
+    val location = paimonTable.location().toString
+
+    val df1 = Seq((1, "2023-08-01"), (2, "2023-08-02")).toDF("a", "b")
+    df1.write.format("paimon").mode("append").save(location)
+    checkAnswer(
+      spark.sql("SELECT * FROM T ORDER BY a, b"),
+      Row(1, "2023-08-01") :: Row(2, "2023-08-02") :: Nil)
+
+    // Case 1: two additional fields: DoubleType and TimestampType
+    val ts = java.sql.Timestamp.valueOf("2023-08-01 10:00:00.0")
+    val df2 = Seq((1, "2023-08-01", 12.3d, ts), (3, "2023-08-03", 34.5d, ts))
+      .toDF("a", "b", "c", "d")
+    df2.write
+      .format("paimon")
+      .mode("append")
+      .option("write.merge-schema", "true")
+      .save(location)
+
+    // Case 2: colum b and d are absent in the coming data
+    val df3 = Seq((4, 45.6d), (5, 56.7d))
+      .toDF("a", "c")
+    df3.write
+      .format("paimon")
+      .mode("append")
+      .option("write.merge-schema", "true")
+      .save(location)
+    val expected3 =
+      Row(1, "2023-08-01", null, null) :: Row(1, "2023-08-01", 12.3d, ts) :: 
Row(
+        2,
+        "2023-08-02",
+        null,
+        null) :: Row(3, "2023-08-03", 34.5d, ts) :: Row(4, null, 45.6d, null) 
:: Row(
+        5,
+        null,
+        56.7d,
+        null) :: Nil
+    checkAnswer(spark.sql("SELECT * FROM T ORDER BY a, b"), expected3)
+  }
 }

Reply via email to