This is an automated email from the ASF dual-hosted git repository.

chengchengjin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 971f590752 [GLUTEN-10648][VL] Support Iceberg overwrite partitions 
dynamic (#10823)
971f590752 is described below

commit 971f5907528b669a548cf13e3d06e353dc079123
Author: Zouxxyy <[email protected]>
AuthorDate: Wed Oct 1 17:15:51 2025 +0800

    [GLUTEN-10648][VL] Support Iceberg overwrite partitions dynamic (#10823)
---
 .../execution/VeloxIcebergAppendDataExec.scala     |  1 -
 ...loxIcebergOverwritePartitionsDynamicExec.scala} | 14 ++++++-----
 .../gluten/extension/OffloadIcebergWrite.scala     | 29 ++++++++++++++++------
 .../execution/enhanced/VeloxIcebergSuite.scala     | 26 ++++++++++++++++++-
 .../gluten/backendsapi/velox/VeloxBackend.scala    | 11 ++++----
 docs/Configuration.md                              |  1 +
 .../apache/gluten/execution/IcebergWriteExec.scala |  5 ++--
 .../gluten/backendsapi/BackendSettingsApi.scala    |  2 ++
 .../org/apache/gluten/config/GlutenConfig.scala    |  9 +++++++
 .../extension/columnar/validator/Validators.scala  |  6 ++++-
 10 files changed, 79 insertions(+), 25 deletions(-)

diff --git 
a/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/VeloxIcebergAppendDataExec.scala
 
b/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/VeloxIcebergAppendDataExec.scala
index 013df1e476..50b5c1171c 100644
--- 
a/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/VeloxIcebergAppendDataExec.scala
+++ 
b/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/VeloxIcebergAppendDataExec.scala
@@ -35,5 +35,4 @@ object VeloxIcebergAppendDataExec {
       original.write
     )
   }
-
 }
diff --git 
a/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/VeloxIcebergAppendDataExec.scala
 
b/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/VeloxIcebergOverwritePartitionsDynamicExec.scala
similarity index 75%
copy from 
backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/VeloxIcebergAppendDataExec.scala
copy to 
backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/VeloxIcebergOverwritePartitionsDynamicExec.scala
index 013df1e476..fc78642f33 100644
--- 
a/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/VeloxIcebergAppendDataExec.scala
+++ 
b/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/VeloxIcebergOverwritePartitionsDynamicExec.scala
@@ -18,22 +18,24 @@ package org.apache.gluten.execution
 
 import org.apache.spark.sql.connector.write.Write
 import org.apache.spark.sql.execution.SparkPlan
-import org.apache.spark.sql.execution.datasources.v2._
+import 
org.apache.spark.sql.execution.datasources.v2.OverwritePartitionsDynamicExec
 
-case class VeloxIcebergAppendDataExec(query: SparkPlan, refreshCache: () => 
Unit, write: Write)
+case class VeloxIcebergOverwritePartitionsDynamicExec(
+    query: SparkPlan,
+    refreshCache: () => Unit,
+    write: Write)
   extends AbstractIcebergWriteExec {
 
   override protected def withNewChildInternal(newChild: SparkPlan): 
IcebergWriteExec =
     copy(query = newChild)
 }
 
-object VeloxIcebergAppendDataExec {
-  def apply(original: AppendDataExec): IcebergWriteExec = {
-    VeloxIcebergAppendDataExec(
+object VeloxIcebergOverwritePartitionsDynamicExec {
+  def apply(original: OverwritePartitionsDynamicExec): IcebergWriteExec = {
+    VeloxIcebergOverwritePartitionsDynamicExec(
       original.query,
       original.refreshCache,
       original.write
     )
   }
-
 }
diff --git 
a/backends-velox/src-iceberg/main/scala/org/apache/gluten/extension/OffloadIcebergWrite.scala
 
b/backends-velox/src-iceberg/main/scala/org/apache/gluten/extension/OffloadIcebergWrite.scala
index 98135fae49..67b701f771 100644
--- 
a/backends-velox/src-iceberg/main/scala/org/apache/gluten/extension/OffloadIcebergWrite.scala
+++ 
b/backends-velox/src-iceberg/main/scala/org/apache/gluten/extension/OffloadIcebergWrite.scala
@@ -17,7 +17,7 @@
 package org.apache.gluten.extension
 
 import org.apache.gluten.config.GlutenConfig
-import org.apache.gluten.execution.{VeloxIcebergAppendDataExec, 
VeloxIcebergOverwriteByExpressionExec, VeloxIcebergReplaceDataExec}
+import org.apache.gluten.execution.{VeloxIcebergAppendDataExec, 
VeloxIcebergOverwriteByExpressionExec, 
VeloxIcebergOverwritePartitionsDynamicExec, VeloxIcebergReplaceDataExec}
 import org.apache.gluten.extension.columnar.enumerated.RasOffload
 import org.apache.gluten.extension.columnar.heuristic.HeuristicTransform
 import org.apache.gluten.extension.columnar.offload.OffloadSingleNode
@@ -25,9 +25,9 @@ import 
org.apache.gluten.extension.columnar.validator.Validators
 import org.apache.gluten.extension.injector.Injector
 
 import org.apache.spark.sql.execution.SparkPlan
-import org.apache.spark.sql.execution.datasources.v2.{AppendDataExec, 
OverwriteByExpressionExec, ReplaceDataExec}
+import org.apache.spark.sql.execution.datasources.v2.{AppendDataExec, 
OverwriteByExpressionExec, OverwritePartitionsDynamicExec, ReplaceDataExec}
 
-case class OffloadIcebergWrite() extends OffloadSingleNode {
+case class OffloadIcebergAppend() extends OffloadSingleNode {
   override def offload(plan: SparkPlan): SparkPlan = plan match {
     case a: AppendDataExec =>
       VeloxIcebergAppendDataExec(a)
@@ -35,7 +35,7 @@ case class OffloadIcebergWrite() extends OffloadSingleNode {
   }
 }
 
-case class OffloadIcebergDelete() extends OffloadSingleNode {
+case class OffloadIcebergReplaceData() extends OffloadSingleNode {
   override def offload(plan: SparkPlan): SparkPlan = plan match {
     case r: ReplaceDataExec =>
       VeloxIcebergReplaceDataExec(r)
@@ -51,12 +51,24 @@ case class OffloadIcebergOverwrite() extends 
OffloadSingleNode {
   }
 }
 
+case class OffloadIcebergOverwritePartitionsDynamic() extends 
OffloadSingleNode {
+  override def offload(plan: SparkPlan): SparkPlan = plan match {
+    case r: OverwritePartitionsDynamicExec =>
+      VeloxIcebergOverwritePartitionsDynamicExec(r)
+    case other => other
+  }
+}
+
 object OffloadIcebergWrite {
   def inject(injector: Injector): Unit = {
     // Inject legacy rule.
     injector.gluten.legacy.injectTransform {
       c =>
-        val offload = Seq(OffloadIcebergWrite(), OffloadIcebergDelete(), 
OffloadIcebergOverwrite())
+        val offload = Seq(
+          OffloadIcebergAppend(),
+          OffloadIcebergReplaceData(),
+          OffloadIcebergOverwrite(),
+          OffloadIcebergOverwritePartitionsDynamic())
         HeuristicTransform.Simple(
           Validators.newValidator(new GlutenConfig(c.sqlConf), offload),
           offload
@@ -64,9 +76,10 @@ object OffloadIcebergWrite {
     }
 
     val offloads: Seq[RasOffload] = Seq(
-      RasOffload.from[AppendDataExec](OffloadIcebergWrite()),
-      RasOffload.from[ReplaceDataExec](OffloadIcebergDelete()),
-      RasOffload.from[OverwriteByExpressionExec](OffloadIcebergOverwrite())
+      RasOffload.from[AppendDataExec](OffloadIcebergAppend()),
+      RasOffload.from[ReplaceDataExec](OffloadIcebergReplaceData()),
+      RasOffload.from[OverwriteByExpressionExec](OffloadIcebergOverwrite()),
+      
RasOffload.from[OverwritePartitionsDynamicExec](OffloadIcebergOverwritePartitionsDynamic())
     )
     offloads.foreach(
       offload =>
diff --git 
a/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala
 
b/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala
index b7270e69a0..6702c77885 100644
--- 
a/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala
+++ 
b/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala
@@ -16,7 +16,7 @@
  */
 package org.apache.gluten.execution.enhanced
 
-import org.apache.gluten.execution.{ColumnarToRowExecBase, IcebergSuite, 
VeloxIcebergAppendDataExec, VeloxIcebergOverwriteByExpressionExec, 
VeloxIcebergReplaceDataExec}
+import org.apache.gluten.execution.{ColumnarToRowExecBase, IcebergSuite, 
VeloxIcebergAppendDataExec, VeloxIcebergOverwriteByExpressionExec, 
VeloxIcebergOverwritePartitionsDynamicExec, VeloxIcebergReplaceDataExec}
 import org.apache.gluten.tags.EnhancedFeaturesTest
 
 import org.apache.spark.sql.{DataFrame, Row}
@@ -259,4 +259,28 @@ class VeloxIcebergSuite extends IcebergSuite {
       checkColumnarToRow(df, 0)
     }
   }
+
+  test("iceberg dynamic insert overwrite partition") {
+    withTable("iceberg_tbl") {
+      spark.sql("""
+                  |create table if not exists iceberg_tbl (a int, pt int) 
using iceberg
+                  |partitioned by (pt)
+                  |""".stripMargin)
+
+      spark.sql("insert into table iceberg_tbl values (1, 1), (2, 2)")
+
+      withSQLConf("spark.sql.sources.partitionOverwriteMode" -> "dynamic") {
+        val df = spark.sql("insert overwrite table iceberg_tbl values (11, 1)")
+        assert(
+          df.queryExecution.executedPlan
+            .asInstanceOf[CommandResultExec]
+            .commandPhysicalPlan
+            .isInstanceOf[VeloxIcebergOverwritePartitionsDynamicExec])
+        checkAnswer(
+          spark.sql("select * from iceberg_tbl order by pt"),
+          Seq(Row(11, 1), Row(2, 2))
+        )
+      }
+    }
+  }
 }
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
index a410f15f89..3967ab68d4 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
@@ -551,12 +551,11 @@ object VeloxBackendSettings extends BackendSettingsApi {
 
   override def enableEnhancedFeatures(): Boolean = 
VeloxConfig.get.enableEnhancedFeatures()
 
-  override def supportAppendDataExec(): Boolean =
-    GlutenConfig.get.enableAppendData && enableEnhancedFeatures()
+  override def supportAppendDataExec(): Boolean = enableEnhancedFeatures()
 
-  override def supportReplaceDataExec(): Boolean =
-    GlutenConfig.get.enableReplaceData && enableEnhancedFeatures()
+  override def supportReplaceDataExec(): Boolean = enableEnhancedFeatures()
 
-  override def supportOverwriteByExpression(): Boolean =
-    GlutenConfig.get.enableOverwriteByExpression && enableEnhancedFeatures()
+  override def supportOverwriteByExpression(): Boolean = 
enableEnhancedFeatures()
+
+  override def supportOverwritePartitionsDynamic(): Boolean = 
enableEnhancedFeatures()
 }
diff --git a/docs/Configuration.md b/docs/Configuration.md
index 9aacdb3777..c3d0c396fd 100644
--- a/docs/Configuration.md
+++ b/docs/Configuration.md
@@ -74,6 +74,7 @@ nav_order: 15
 | spark.gluten.sql.columnar.limit                                    | true    
          |
 | spark.gluten.sql.columnar.maxBatchSize                             | 4096    
          |
 | spark.gluten.sql.columnar.overwriteByExpression                    | true    
          | Enable or disable columnar v2 command overwrite by expression.      
                                                                                
                                                                                
                                                                                
                                                           |
+| spark.gluten.sql.columnar.overwritePartitionsDynamic               | true    
          | Enable or disable columnar v2 command overwrite partitions dynamic. 
                                                                                
                                                                                
                                                                                
                                                           |
 | spark.gluten.sql.columnar.parquet.write.blockSize                  | 128MB   
          |
 | spark.gluten.sql.columnar.partial.generate                         | true    
          | Evaluates the non-offload-able HiveUDTF using vanilla Spark 
generator                                                                       
                                                                                
                                                                                
                                                                   |
 | spark.gluten.sql.columnar.partial.project                          | true    
          | Break up one project node into 2 phases when some of the 
expressions are non offload-able. Phase one is a regular offloaded project 
transformer that evaluates the offload-able expressions in native, phase two 
preserves the output from phase one and evaluates the remaining 
non-offload-able expressions using vanilla Spark projections                    
              |
diff --git 
a/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergWriteExec.scala
 
b/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergWriteExec.scala
index 5565ccbf35..a3346468df 100644
--- 
a/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergWriteExec.scala
+++ 
b/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergWriteExec.scala
@@ -66,8 +66,9 @@ trait IcebergWriteExec extends ColumnarV2TableWriteExec {
     if (IcebergWriteUtil.hasUnsupportedDataType(write)) {
       return ValidationResult.failed("Contains UUID ot FIXED data type")
     }
-    if 
(BackendsApiManager.getValidatorApiInstance.doSchemaValidate(query.schema).isDefined)
 {
-      return ValidationResult.failed("Contains unsupported data type")
+    BackendsApiManager.getValidatorApiInstance.doSchemaValidate(query.schema) 
match {
+      case Some(reason) => return ValidationResult.failed(reason)
+      case None =>
     }
     val spec = IcebergWriteUtil.getTable(write).spec()
     if (spec.isPartitioned) {
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
index 72caa1203e..eea34e8c44 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
@@ -161,4 +161,6 @@ trait BackendSettingsApi {
   def supportReplaceDataExec(): Boolean = false
 
   def supportOverwriteByExpression(): Boolean = false
+
+  def supportOverwritePartitionsDynamic(): Boolean = false
 }
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala 
b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
index 6e686210ef..aeecbdb198 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
@@ -90,6 +90,9 @@ class GlutenConfig(conf: SQLConf) extends 
GlutenCoreConfig(conf) {
 
   def enableOverwriteByExpression: Boolean = 
getConf(COLUMNAR_OVERWRIET_BY_EXPRESSION_ENABLED)
 
+  def enableOverwritePartitionsDynamic: Boolean =
+    getConf(COLUMNAR_OVERWRIET_PARTITIONS_DYNAMIC_ENABLED)
+
   def enableColumnarShuffledHashJoin: Boolean = 
getConf(COLUMNAR_SHUFFLED_HASH_JOIN_ENABLED)
 
   def shuffledHashJoinOptimizeBuildSide: Boolean =
@@ -864,6 +867,12 @@ object GlutenConfig {
       .booleanConf
       .createWithDefault(true)
 
+  val COLUMNAR_OVERWRIET_PARTITIONS_DYNAMIC_ENABLED =
+    buildConf("spark.gluten.sql.columnar.overwriteOverwritePartitionsDynamic")
+      .doc("Enable or disable columnar v2 command overwrite partitions 
dynamic.")
+      .booleanConf
+      .createWithDefault(true)
+
   val COLUMNAR_PREFER_STREAMING_AGGREGATE =
     buildConf("spark.gluten.sql.columnar.preferStreamingAggregate")
       .doc(
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
index 2eda0ee12c..14d4a7ffcf 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
@@ -28,7 +28,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, 
ObjectHashAggregateExec, SortAggregateExec}
 import org.apache.spark.sql.execution.datasources.WriteFilesExec
-import org.apache.spark.sql.execution.datasources.v2.{AppendDataExec, 
BatchScanExec, OverwriteByExpressionExec, ReplaceDataExec}
+import org.apache.spark.sql.execution.datasources.v2.{AppendDataExec, 
BatchScanExec, OverwriteByExpressionExec, OverwritePartitionsDynamicExec, 
ReplaceDataExec}
 import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, 
ShuffleExchangeExec}
 import org.apache.spark.sql.execution.joins._
 import org.apache.spark.sql.execution.window.WindowExec
@@ -138,6 +138,8 @@ object Validators {
       case p: AppendDataExec if !settings.supportAppendDataExec() => fail(p)
       case p: ReplaceDataExec if !settings.supportReplaceDataExec() => fail(p)
       case p: OverwriteByExpressionExec if 
!settings.supportOverwriteByExpression() => fail(p)
+      case p: OverwritePartitionsDynamicExec if 
!settings.supportOverwritePartitionsDynamic() =>
+        fail(p)
       case _ => pass()
     }
   }
@@ -160,6 +162,8 @@ object Validators {
       case p: AppendDataExec if !glutenConf.enableAppendData => fail(p)
       case p: ReplaceDataExec if !glutenConf.enableReplaceData => fail(p)
       case p: OverwriteByExpressionExec if 
!glutenConf.enableOverwriteByExpression => fail(p)
+      case p: OverwritePartitionsDynamicExec if 
!glutenConf.enableOverwritePartitionsDynamic =>
+        fail(p)
       case p @ (_: LocalLimitExec | _: GlobalLimitExec) if 
!glutenConf.enableColumnarLimit =>
         fail(p)
       case p: GenerateExec if !glutenConf.enableColumnarGenerate => fail(p)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to