This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 21ecb7560b [spark] Fix insert with write.merge-schema (#6041)
21ecb7560b is described below

commit 21ecb7560b36b56187c8cef9fe3514dd8c4318a5
Author: Zouxxyy <[email protected]>
AuthorDate: Fri Aug 8 10:06:52 2025 +0800

    [spark] Fix insert with write.merge-schema (#6041)
---
 docs/README.md                                     |   2 +-
 docs/content/spark/sql-write.md                    |  18 ++++
 .../spark/catalyst/analysis/PaimonAnalysis.scala   |  43 +++++++--
 .../paimon/spark/commands/SchemaHelper.scala       |  39 +++++++-
 .../spark/commands/WriteIntoPaimonTable.scala      |  24 +----
 .../org/apache/paimon/spark/util/OptionUtils.scala |   8 ++
 .../paimon/spark/sql/WriteMergeSchemaTest.scala    | 103 +++++++++++++++++++++
 7 files changed, 202 insertions(+), 35 deletions(-)

diff --git a/docs/README.md b/docs/README.md
index c6901b5953..fb96248516 100644
--- a/docs/README.md
+++ b/docs/README.md
@@ -25,7 +25,7 @@ From this directory:
        hugo -b "" serve
        ```
 
-The site can be viewed at http://localhost:1313/docs/master/
+The site can be viewed at http://localhost:1313/
 
 # Contribute
 
diff --git a/docs/content/spark/sql-write.md b/docs/content/spark/sql-write.md
index 1f4407ef89..fb01b22058 100644
--- a/docs/content/spark/sql-write.md
+++ b/docs/content/spark/sql-write.md
@@ -253,6 +253,12 @@ val stream = df
 
 ## Schema Evolution
 
+{{< hint info >}}
+
+Since the table schema may be updated during writing, catalog caching needs to 
be disabled to use this feature. Configure 
`spark.sql.catalog.<catalogName>.cache-enabled` to `false`.
+
+{{< /hint >}}
+
 Schema evolution is a feature that allows users to easily modify the current 
schema of a table to adapt to existing data, or new data that changes over 
time, while maintaining data integrity and consistency.
 
 Paimon supports automatic schema merging of source data and current table data 
while data is being written, and uses the merged schema as the latest schema of 
the table, and it only requires configuring `write.merge-schema`.
@@ -306,3 +312,15 @@ Here list the configurations.
         </tr>
     </tbody>
 </table>
+
+This mode also supports Spark SQL. Here is an example:
+
+```sql
+set `spark.paimon.write.merge-schema` = true;
+
+CREATE TABLE t (a INT, b STRING);
+INSERT INTO t VALUES (1, '1'), (2, '2');
+
+-- need using by name mode
+INSERT INTO t BY NAME SELECT 3 AS a, '3' AS b, 3 AS c;
+```
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonAnalysis.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonAnalysis.scala
index fc713f91be..f44d9274e4 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonAnalysis.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonAnalysis.scala
@@ -18,10 +18,11 @@
 
 package org.apache.paimon.spark.catalyst.analysis
 
-import org.apache.paimon.spark.SparkTable
+import org.apache.paimon.spark.{SparkConnectorOptions, SparkTable}
 import org.apache.paimon.spark.catalyst.Compatibility
 import org.apache.paimon.spark.catalyst.analysis.PaimonRelation.isPaimonTable
 import org.apache.paimon.spark.commands.{PaimonAnalyzeTableColumnCommand, 
PaimonDynamicPartitionOverwriteCommand, PaimonShowColumnsCommand, 
PaimonTruncateTableCommand}
+import org.apache.paimon.spark.util.OptionUtils
 import org.apache.paimon.table.FileStoreTable
 
 import org.apache.spark.sql.SparkSession
@@ -40,12 +41,14 @@ import scala.collection.mutable
 class PaimonAnalysis(session: SparkSession) extends Rule[LogicalPlan] {
 
   override def apply(plan: LogicalPlan): LogicalPlan = 
plan.resolveOperatorsDown {
+
     case a @ PaimonV2WriteCommand(table) if !paimonWriteResolved(a.query, 
table) =>
-      val newQuery = resolveQueryColumns(a.query, table, a.isByName)
-      if (newQuery != a.query) {
-        Compatibility.withNewQuery(a, newQuery)
-      } else {
-        a
+      val mergeSchemaEnabled =
+        
writeOptions(a).get(SparkConnectorOptions.MERGE_SCHEMA.key()).contains("true") 
||
+          OptionUtils.writeMergeSchemaEnabled()
+      resolveQueryColumns(a.query, table, a.isByName, mergeSchemaEnabled) 
match {
+        case Some(newQuery) if newQuery != a.query => 
Compatibility.withNewQuery(a, newQuery)
+        case _ => a
       }
 
     case o @ PaimonDynamicPartitionOverwrite(r, d) if o.resolved =>
@@ -58,6 +61,15 @@ class PaimonAnalysis(session: SparkSession) extends 
Rule[LogicalPlan] {
       PaimonShowColumnsCommand(table)
   }
 
+  private def writeOptions(v2WriteCommand: V2WriteCommand): Map[String, 
String] = {
+    v2WriteCommand match {
+      case a: AppendData => a.writeOptions
+      case o: OverwriteByExpression => o.writeOptions
+      case op: OverwritePartitionsDynamic => op.writeOptions
+      case _ => Map.empty[String, String]
+    }
+  }
+
   private def paimonWriteResolved(query: LogicalPlan, table: NamedRelation): 
Boolean = {
     query.output.size == table.output.size &&
     query.output.zip(table.output).forall {
@@ -71,12 +83,25 @@ class PaimonAnalysis(session: SparkSession) extends 
Rule[LogicalPlan] {
   private def resolveQueryColumns(
       query: LogicalPlan,
       table: NamedRelation,
-      byName: Boolean): LogicalPlan = {
+      byName: Boolean,
+      mergeSchemaEnabled: Boolean = false): Option[LogicalPlan] = {
     // More details see: `TableOutputResolver#resolveOutputColumns`
     if (byName) {
-      resolveQueryColumnsByName(query, table)
+      try {
+        Option.apply(resolveQueryColumnsByName(query, table))
+      } catch {
+        case e: Exception =>
+          // Merge schema is effective only when using byName mode.
+          // Schema validation is skipped here, because schema validation or 
merging will be
+          // done during insertion when mergeSchemaEnabled.
+          if (mergeSchemaEnabled) {
+            Option.empty
+          } else {
+            throw e
+          }
+      }
     } else {
-      resolveQueryColumnsByPosition(query, table)
+      Option.apply(resolveQueryColumnsByPosition(query, table))
     }
   }
 
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SchemaHelper.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SchemaHelper.scala
index fdabe6c436..d66a941929 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SchemaHelper.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SchemaHelper.scala
@@ -18,10 +18,15 @@
 
 package org.apache.paimon.spark.commands
 
-import org.apache.paimon.spark.SparkTypeUtils
+import org.apache.paimon.options.Options
+import org.apache.paimon.spark.{SparkConnectorOptions, SparkTypeUtils}
+import org.apache.paimon.spark.schema.SparkSystemColumns
+import org.apache.paimon.spark.util.OptionUtils
 import org.apache.paimon.table.FileStoreTable
 import org.apache.paimon.types.RowType
 
+import org.apache.spark.sql.{DataFrame, PaimonUtils, SparkSession}
+import org.apache.spark.sql.functions.{col, lit}
 import org.apache.spark.sql.types.StructType
 
 import scala.collection.JavaConverters._
@@ -34,7 +39,36 @@ private[spark] trait SchemaHelper extends WithFileStoreTable 
{
 
   override def table: FileStoreTable = newTable.getOrElse(originTable)
 
-  def mergeAndCommitSchema(dataSchema: StructType, allowExplicitCast: 
Boolean): Unit = {
+  def mergeSchema(sparkSession: SparkSession, input: DataFrame, options: 
Options): DataFrame = {
+    val mergeSchemaEnabled =
+      options.get(SparkConnectorOptions.MERGE_SCHEMA) || 
OptionUtils.writeMergeSchemaEnabled()
+    if (!mergeSchemaEnabled) {
+      return input
+    }
+
+    val dataSchema = SparkSystemColumns.filterSparkSystemColumns(input.schema)
+    val allowExplicitCast = options.get(SparkConnectorOptions.EXPLICIT_CAST) 
|| OptionUtils
+      .writeMergeSchemaExplicitCastEnabled()
+    mergeAndCommitSchema(dataSchema, allowExplicitCast)
+
+    // For case that some columns is absent in data, we still allow to write 
once write.merge-schema is true.
+    val newTableSchema = 
SparkTypeUtils.fromPaimonRowType(table.schema().logicalRowType())
+    if (!PaimonUtils.sameType(newTableSchema, dataSchema)) {
+      val resolve = sparkSession.sessionState.conf.resolver
+      val cols = newTableSchema.map {
+        field =>
+          dataSchema.find(f => resolve(f.name, field.name)) match {
+            case Some(f) => col(f.name)
+            case _ => lit(null).as(field.name)
+          }
+      }
+      input.select(cols: _*)
+    } else {
+      input
+    }
+  }
+
+  private def mergeAndCommitSchema(dataSchema: StructType, allowExplicitCast: 
Boolean): Unit = {
     val dataRowType = 
SparkTypeUtils.toPaimonType(dataSchema).asInstanceOf[RowType]
     if (table.store().mergeSchema(dataRowType, allowExplicitCast)) {
       newTable = Some(table.copyWithLatestSchema())
@@ -44,5 +78,4 @@ private[spark] trait SchemaHelper extends WithFileStoreTable {
   def updateTableWithOptions(options: Map[String, String]): Unit = {
     newTable = Some(table.copy(options.asJava))
   }
-
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
index c56000ff99..5a1beff25e 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
@@ -22,6 +22,7 @@ import 
org.apache.paimon.CoreOptions.DYNAMIC_PARTITION_OVERWRITE
 import org.apache.paimon.options.Options
 import org.apache.paimon.spark._
 import org.apache.paimon.spark.schema.SparkSystemColumns
+import org.apache.paimon.spark.util.OptionUtils
 import org.apache.paimon.table.FileStoreTable
 
 import org.apache.spark.internal.Logging
@@ -43,29 +44,8 @@ case class WriteIntoPaimonTable(
   with SchemaHelper
   with Logging {
 
-  private lazy val mergeSchema = 
options.get(SparkConnectorOptions.MERGE_SCHEMA)
-
   override def run(sparkSession: SparkSession): Seq[Row] = {
-    var data = _data
-    if (mergeSchema) {
-      val dataSchema = SparkSystemColumns.filterSparkSystemColumns(data.schema)
-      val allowExplicitCast = options.get(SparkConnectorOptions.EXPLICIT_CAST)
-      mergeAndCommitSchema(dataSchema, allowExplicitCast)
-
-      // For case that some columns is absent in data, we still allow to write 
once write.merge-schema is true.
-      val newTableSchema = 
SparkTypeUtils.fromPaimonRowType(table.schema().logicalRowType())
-      if (!PaimonUtils.sameType(newTableSchema, dataSchema)) {
-        val resolve = sparkSession.sessionState.conf.resolver
-        val cols = newTableSchema.map {
-          field =>
-            dataSchema.find(f => resolve(f.name, field.name)) match {
-              case Some(f) => col(f.name)
-              case _ => lit(null).as(field.name)
-            }
-        }
-        data = data.select(cols: _*)
-      }
-    }
+    val data = mergeSchema(sparkSession, _data, options)
 
     val (dynamicPartitionOverwriteMode, overwritePartition) = parseSaveMode()
     // use the extra options to rebuild the table object
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/OptionUtils.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/OptionUtils.scala
index ce66bb838d..253c0dac89 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/OptionUtils.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/OptionUtils.scala
@@ -63,6 +63,14 @@ object OptionUtils extends SQLConfHelper {
     getOptionString(SparkConnectorOptions.USE_V2_WRITE).toBoolean
   }
 
+  def writeMergeSchemaEnabled(): Boolean = {
+    getOptionString(SparkConnectorOptions.MERGE_SCHEMA).toBoolean
+  }
+
+  def writeMergeSchemaExplicitCastEnabled(): Boolean = {
+    getOptionString(SparkConnectorOptions.EXPLICIT_CAST).toBoolean
+  }
+
   def extractCatalogName(): Option[String] = {
     val sparkCatalogTemplate = String.format("%s([^.]*)$", 
SPARK_CATALOG_PREFIX)
     val sparkCatalogPattern = Pattern.compile(sparkCatalogTemplate)
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/WriteMergeSchemaTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/WriteMergeSchemaTest.scala
new file mode 100644
index 0000000000..de506cc5c2
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/WriteMergeSchemaTest.scala
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+import org.apache.paimon.spark.PaimonSparkTestBase
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.Row
+
+class WriteMergeSchemaTest extends PaimonSparkTestBase {
+
+  // todo: fix this
+  override protected def sparkConf: SparkConf = {
+    super.sparkConf.set("spark.sql.catalog.paimon.cache-enabled", "false")
+  }
+
+  import testImplicits._
+
+  test("Write merge schema: dataframe write") {
+    withTable("t") {
+      sql("CREATE TABLE t (a INT, b STRING)")
+      Seq((1, "1"), (2, "2"))
+        .toDF("a", "b")
+        .write
+        .format("paimon")
+        .mode("append")
+        .saveAsTable("t")
+
+      // new columns
+      Seq((3, "3", 3))
+        .toDF("a", "b", "c")
+        .write
+        .format("paimon")
+        .mode("append")
+        .option("write.merge-schema", "true")
+        .saveAsTable("t")
+      checkAnswer(
+        sql("SELECT * FROM t ORDER BY a"),
+        Seq(Row(1, "1", null), Row(2, "2", null), Row(3, "3", 3))
+      )
+
+      // missing columns and new columns
+      Seq(("4", "4", 4))
+        .toDF("d", "b", "c")
+        .write
+        .format("paimon")
+        .mode("append")
+        .option("write.merge-schema", "true")
+        .saveAsTable("t")
+      checkAnswer(
+        sql("SELECT * FROM t ORDER BY a"),
+        Seq(
+          Row(null, "4", 4, "4"),
+          Row(1, "1", null, null),
+          Row(2, "2", null, null),
+          Row(3, "3", 3, null))
+      )
+    }
+  }
+
+  test("Write merge schema: sql write") {
+    withTable("t") {
+      withSparkSQLConf("spark.paimon.write.merge-schema" -> "true") {
+        sql("CREATE TABLE t (a INT, b STRING)")
+        sql("INSERT INTO t VALUES (1, '1'), (2, '2')")
+
+        // new columns
+        sql("INSERT INTO t BY NAME SELECT 3 AS a, '3' AS b, 3 AS c")
+        checkAnswer(
+          sql("SELECT * FROM t ORDER BY a"),
+          Seq(Row(1, "1", null), Row(2, "2", null), Row(3, "3", 3))
+        )
+
+        // missing columns and new columns
+        sql("INSERT INTO t BY NAME SELECT '4' AS d, '4' AS b, 4 AS c")
+        checkAnswer(
+          sql("SELECT * FROM t ORDER BY a"),
+          Seq(
+            Row(null, "4", 4, "4"),
+            Row(1, "1", null, null),
+            Row(2, "2", null, null),
+            Row(3, "3", 3, null))
+        )
+      }
+    }
+  }
+}

Reply via email to