This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new f1b91c4d9e [spark] Enable v2 dynamic partition overwrite (#5947)
f1b91c4d9e is described below

commit f1b91c4d9e8e70dadcdd18ce7b8ebd080bf49852
Author: Zouxxyy <[email protected]>
AuthorDate: Thu Jul 24 10:10:06 2025 +0800

    [spark] Enable v2 dynamic partition overwrite (#5947)
---
 .../scala/org/apache/paimon/spark/SparkTable.scala |  2 +-
 .../spark/catalyst/analysis/PaimonAnalysis.scala   | 11 +++--
 .../apache/paimon/spark/write/PaimonV2Write.scala  |  2 +-
 .../paimon/spark/write/PaimonV2WriteBuilder.scala  |  3 +-
 .../spark/sql/InsertOverwriteTableTestBase.scala   | 50 +++++++++++++---------
 5 files changed, 41 insertions(+), 27 deletions(-)

diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
index dbc57e3edc..a3fac1db79 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
@@ -102,12 +102,12 @@ case class SparkTable(table: Table)
     val capabilities = JEnumSet.of(
       TableCapability.BATCH_READ,
       TableCapability.OVERWRITE_BY_FILTER,
-      TableCapability.OVERWRITE_DYNAMIC,
       TableCapability.MICRO_BATCH_READ
     )
 
     if (useV2Write) {
       capabilities.add(TableCapability.BATCH_WRITE)
+      capabilities.add(TableCapability.OVERWRITE_DYNAMIC)
     } else {
       capabilities.add(TableCapability.ACCEPT_ANY_SCHEMA)
       capabilities.add(TableCapability.V1_BATCH_WRITE)
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonAnalysis.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonAnalysis.scala
index 7909838668..fc713f91be 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonAnalysis.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonAnalysis.scala
@@ -26,13 +26,14 @@ import org.apache.paimon.table.FileStoreTable
 
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.analysis.{NamedRelation, ResolvedTable}
-import org.apache.spark.sql.catalyst.expressions.{Alias, ArrayTransform, 
Attribute, CreateNamedStruct, CreateStruct, Expression, GetArrayItem, 
GetStructField, If, IsNull, LambdaFunction, Literal, NamedExpression, 
NamedLambdaVariable}
+import org.apache.spark.sql.catalyst.expressions.{Alias, ArrayTransform, 
Attribute, CreateStruct, Expression, GetArrayItem, GetStructField, If, IsNull, 
LambdaFunction, Literal, NamedExpression, NamedLambdaVariable}
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.util.CharVarcharUtils
 import org.apache.spark.sql.catalyst.util.TypeUtils.toSQLId
+import org.apache.spark.sql.connector.catalog.TableCapability
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
-import org.apache.spark.sql.types.{ArrayType, DataType, IntegerType, MapType, 
Metadata, StructField, StructType}
+import org.apache.spark.sql.types._
 
 import scala.collection.mutable
 
@@ -344,8 +345,12 @@ object PaimonV2WriteCommand {
 object PaimonDynamicPartitionOverwrite {
   def unapply(o: OverwritePartitionsDynamic): Option[(DataSourceV2Relation, 
FileStoreTable)] = {
     if (o.query.resolved) {
+      // when overwrite dynamic is not supported, fallback to use v1 write
       o.table match {
-        case r: DataSourceV2Relation if r.table.isInstanceOf[SparkTable] =>
+        case r: DataSourceV2Relation
+            if r.table.isInstanceOf[SparkTable] && !r.table
+              .capabilities()
+              .contains(TableCapability.OVERWRITE_DYNAMIC) =>
           Some((r, 
r.table.asInstanceOf[SparkTable].getTable.asInstanceOf[FileStoreTable]))
         case _ => None
       }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2Write.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2Write.scala
index 5c325c26fd..8eaeffe2fc 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2Write.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2Write.scala
@@ -45,7 +45,7 @@ class PaimonV2Write(
   with Logging {
 
   assert(
-    !(overwriteDynamic && overwritePartitions.nonEmpty),
+    !(overwriteDynamic && overwritePartitions.exists(_.nonEmpty)),
     "Cannot overwrite dynamically and by filter both")
 
   private val table =
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2WriteBuilder.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2WriteBuilder.scala
index 462fd3311f..90f30a3955 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2WriteBuilder.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2WriteBuilder.scala
@@ -59,11 +59,12 @@ class PaimonV2WriteBuilder(table: FileStoreTable, 
writeSchema: StructType)
   }
 
   override def overwriteDynamicPartitions(): WriteBuilder = {
-    if (overwritePartitions.isDefined) {
+    if (overwritePartitions.exists(_.nonEmpty)) {
       throw new IllegalArgumentException("Cannot overwrite dynamically and by 
filter both")
     }
 
     overwriteDynamic = true
+    overwritePartitions = Option.apply(Map.empty[String, String])
     this
   }
 }
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/InsertOverwriteTableTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/InsertOverwriteTableTestBase.scala
index 82fb1aed0a..467cd72f77 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/InsertOverwriteTableTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/InsertOverwriteTableTestBase.scala
@@ -580,30 +580,38 @@ abstract class InsertOverwriteTableTestBase extends 
PaimonSparkTestBase {
     withTable("my_table") {
       sql("CREATE TABLE my_table (id INT, pt STRING) PARTITIONED BY (pt)")
 
-      for (mode <- Seq("static", "dynamic")) {
-        withSparkSQLConf("spark.sql.sources.partitionOverwriteMode" -> mode) {
-          sql("INSERT OVERWRITE my_table VALUES (1, 'p1'), (2, 'p2')")
-          // INSERT OVERWRITE table
-          sql("INSERT OVERWRITE my_table VALUES (3, 'p1')")
-          if (mode == "dynamic") {
-            checkAnswer(sql("SELECT * FROM my_table ORDER BY id"), Seq(Row(2, 
"p2"), Row(3, "p1")))
-          } else {
-            checkAnswer(sql("SELECT * FROM my_table ORDER BY id"), Row(3, 
"p1"))
-          }
+      for (useV2Write <- Seq("true", "false")) {
+        for (mode <- Seq("static", "dynamic")) {
+          withSparkSQLConf(
+            "spark.sql.sources.partitionOverwriteMode" -> mode,
+            "spark.paimon.write.use-v2-write" -> useV2Write) {
+            sql("INSERT OVERWRITE my_table VALUES (1, 'p1'), (2, 'p2')")
+            // INSERT OVERWRITE table
+            sql("INSERT OVERWRITE my_table VALUES (3, 'p1')")
+            if (mode == "dynamic") {
+              checkAnswer(
+                sql("SELECT * FROM my_table ORDER BY id"),
+                Seq(Row(2, "p2"), Row(3, "p1")))
+            } else {
+              checkAnswer(sql("SELECT * FROM my_table ORDER BY id"), Row(3, 
"p1"))
+            }
 
-          sql("INSERT OVERWRITE my_table VALUES (1, 'p1'), (2, 'p2')")
-          // INSERT OVERWRITE table PARTITION (pt)
-          sql("INSERT OVERWRITE my_table PARTITION (pt) VALUES (3, 'p1')")
-          if (mode == "dynamic") {
+            sql("INSERT OVERWRITE my_table VALUES (1, 'p1'), (2, 'p2')")
+            // INSERT OVERWRITE table PARTITION (pt)
+            sql("INSERT OVERWRITE my_table PARTITION (pt) VALUES (3, 'p1')")
+            if (mode == "dynamic") {
+              checkAnswer(
+                sql("SELECT * FROM my_table ORDER BY id"),
+                Seq(Row(2, "p2"), Row(3, "p1")))
+            } else {
+              checkAnswer(sql("SELECT * FROM my_table ORDER BY id"), Row(3, 
"p1"))
+            }
+
+            sql("INSERT OVERWRITE my_table VALUES (1, 'p1'), (2, 'p2')")
+            // INSERT OVERWRITE table PARTITION (pt='p1')
+            sql("INSERT OVERWRITE my_table PARTITION (pt='p1') VALUES (3)")
             checkAnswer(sql("SELECT * FROM my_table ORDER BY id"), Seq(Row(2, 
"p2"), Row(3, "p1")))
-          } else {
-            checkAnswer(sql("SELECT * FROM my_table ORDER BY id"), Row(3, 
"p1"))
           }
-
-          sql("INSERT OVERWRITE my_table VALUES (1, 'p1'), (2, 'p2')")
-          // INSERT OVERWRITE table PARTITION (pt='p1')
-          sql("INSERT OVERWRITE my_table PARTITION (pt='p1') VALUES (3)")
-          checkAnswer(sql("SELECT * FROM my_table ORDER BY id"), Seq(Row(2, 
"p2"), Row(3, "p1")))
         }
       }
     }

Reply via email to