This is an automated email from the ASF dual-hosted git repository.
chengchengjin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 971f590752 [GLUTEN-10648][VL] Support Iceberg overwrite partitions
dynamic (#10823)
971f590752 is described below
commit 971f5907528b669a548cf13e3d06e353dc079123
Author: Zouxxyy <[email protected]>
AuthorDate: Wed Oct 1 17:15:51 2025 +0800
[GLUTEN-10648][VL] Support Iceberg overwrite partitions dynamic (#10823)
---
.../execution/VeloxIcebergAppendDataExec.scala | 1 -
...loxIcebergOverwritePartitionsDynamicExec.scala} | 14 ++++++-----
.../gluten/extension/OffloadIcebergWrite.scala | 29 ++++++++++++++++------
.../execution/enhanced/VeloxIcebergSuite.scala | 26 ++++++++++++++++++-
.../gluten/backendsapi/velox/VeloxBackend.scala | 11 ++++----
docs/Configuration.md | 1 +
.../apache/gluten/execution/IcebergWriteExec.scala | 5 ++--
.../gluten/backendsapi/BackendSettingsApi.scala | 2 ++
.../org/apache/gluten/config/GlutenConfig.scala | 9 +++++++
.../extension/columnar/validator/Validators.scala | 6 ++++-
10 files changed, 79 insertions(+), 25 deletions(-)
diff --git
a/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/VeloxIcebergAppendDataExec.scala
b/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/VeloxIcebergAppendDataExec.scala
index 013df1e476..50b5c1171c 100644
---
a/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/VeloxIcebergAppendDataExec.scala
+++
b/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/VeloxIcebergAppendDataExec.scala
@@ -35,5 +35,4 @@ object VeloxIcebergAppendDataExec {
original.write
)
}
-
}
diff --git
a/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/VeloxIcebergAppendDataExec.scala
b/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/VeloxIcebergOverwritePartitionsDynamicExec.scala
similarity index 75%
copy from
backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/VeloxIcebergAppendDataExec.scala
copy to
backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/VeloxIcebergOverwritePartitionsDynamicExec.scala
index 013df1e476..fc78642f33 100644
---
a/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/VeloxIcebergAppendDataExec.scala
+++
b/backends-velox/src-iceberg/main/scala/org/apache/gluten/execution/VeloxIcebergOverwritePartitionsDynamicExec.scala
@@ -18,22 +18,24 @@ package org.apache.gluten.execution
import org.apache.spark.sql.connector.write.Write
import org.apache.spark.sql.execution.SparkPlan
-import org.apache.spark.sql.execution.datasources.v2._
+import
org.apache.spark.sql.execution.datasources.v2.OverwritePartitionsDynamicExec
-case class VeloxIcebergAppendDataExec(query: SparkPlan, refreshCache: () =>
Unit, write: Write)
+case class VeloxIcebergOverwritePartitionsDynamicExec(
+ query: SparkPlan,
+ refreshCache: () => Unit,
+ write: Write)
extends AbstractIcebergWriteExec {
override protected def withNewChildInternal(newChild: SparkPlan):
IcebergWriteExec =
copy(query = newChild)
}
-object VeloxIcebergAppendDataExec {
- def apply(original: AppendDataExec): IcebergWriteExec = {
- VeloxIcebergAppendDataExec(
+object VeloxIcebergOverwritePartitionsDynamicExec {
+ def apply(original: OverwritePartitionsDynamicExec): IcebergWriteExec = {
+ VeloxIcebergOverwritePartitionsDynamicExec(
original.query,
original.refreshCache,
original.write
)
}
-
}
diff --git
a/backends-velox/src-iceberg/main/scala/org/apache/gluten/extension/OffloadIcebergWrite.scala
b/backends-velox/src-iceberg/main/scala/org/apache/gluten/extension/OffloadIcebergWrite.scala
index 98135fae49..67b701f771 100644
---
a/backends-velox/src-iceberg/main/scala/org/apache/gluten/extension/OffloadIcebergWrite.scala
+++
b/backends-velox/src-iceberg/main/scala/org/apache/gluten/extension/OffloadIcebergWrite.scala
@@ -17,7 +17,7 @@
package org.apache.gluten.extension
import org.apache.gluten.config.GlutenConfig
-import org.apache.gluten.execution.{VeloxIcebergAppendDataExec,
VeloxIcebergOverwriteByExpressionExec, VeloxIcebergReplaceDataExec}
+import org.apache.gluten.execution.{VeloxIcebergAppendDataExec,
VeloxIcebergOverwriteByExpressionExec,
VeloxIcebergOverwritePartitionsDynamicExec, VeloxIcebergReplaceDataExec}
import org.apache.gluten.extension.columnar.enumerated.RasOffload
import org.apache.gluten.extension.columnar.heuristic.HeuristicTransform
import org.apache.gluten.extension.columnar.offload.OffloadSingleNode
@@ -25,9 +25,9 @@ import
org.apache.gluten.extension.columnar.validator.Validators
import org.apache.gluten.extension.injector.Injector
import org.apache.spark.sql.execution.SparkPlan
-import org.apache.spark.sql.execution.datasources.v2.{AppendDataExec,
OverwriteByExpressionExec, ReplaceDataExec}
+import org.apache.spark.sql.execution.datasources.v2.{AppendDataExec,
OverwriteByExpressionExec, OverwritePartitionsDynamicExec, ReplaceDataExec}
-case class OffloadIcebergWrite() extends OffloadSingleNode {
+case class OffloadIcebergAppend() extends OffloadSingleNode {
override def offload(plan: SparkPlan): SparkPlan = plan match {
case a: AppendDataExec =>
VeloxIcebergAppendDataExec(a)
@@ -35,7 +35,7 @@ case class OffloadIcebergWrite() extends OffloadSingleNode {
}
}
-case class OffloadIcebergDelete() extends OffloadSingleNode {
+case class OffloadIcebergReplaceData() extends OffloadSingleNode {
override def offload(plan: SparkPlan): SparkPlan = plan match {
case r: ReplaceDataExec =>
VeloxIcebergReplaceDataExec(r)
@@ -51,12 +51,24 @@ case class OffloadIcebergOverwrite() extends
OffloadSingleNode {
}
}
+case class OffloadIcebergOverwritePartitionsDynamic() extends
OffloadSingleNode {
+ override def offload(plan: SparkPlan): SparkPlan = plan match {
+ case r: OverwritePartitionsDynamicExec =>
+ VeloxIcebergOverwritePartitionsDynamicExec(r)
+ case other => other
+ }
+}
+
object OffloadIcebergWrite {
def inject(injector: Injector): Unit = {
// Inject legacy rule.
injector.gluten.legacy.injectTransform {
c =>
- val offload = Seq(OffloadIcebergWrite(), OffloadIcebergDelete(),
OffloadIcebergOverwrite())
+ val offload = Seq(
+ OffloadIcebergAppend(),
+ OffloadIcebergReplaceData(),
+ OffloadIcebergOverwrite(),
+ OffloadIcebergOverwritePartitionsDynamic())
HeuristicTransform.Simple(
Validators.newValidator(new GlutenConfig(c.sqlConf), offload),
offload
@@ -64,9 +76,10 @@ object OffloadIcebergWrite {
}
val offloads: Seq[RasOffload] = Seq(
- RasOffload.from[AppendDataExec](OffloadIcebergWrite()),
- RasOffload.from[ReplaceDataExec](OffloadIcebergDelete()),
- RasOffload.from[OverwriteByExpressionExec](OffloadIcebergOverwrite())
+ RasOffload.from[AppendDataExec](OffloadIcebergAppend()),
+ RasOffload.from[ReplaceDataExec](OffloadIcebergReplaceData()),
+ RasOffload.from[OverwriteByExpressionExec](OffloadIcebergOverwrite()),
+
RasOffload.from[OverwritePartitionsDynamicExec](OffloadIcebergOverwritePartitionsDynamic())
)
offloads.foreach(
offload =>
diff --git
a/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala
b/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala
index b7270e69a0..6702c77885 100644
---
a/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala
+++
b/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala
@@ -16,7 +16,7 @@
*/
package org.apache.gluten.execution.enhanced
-import org.apache.gluten.execution.{ColumnarToRowExecBase, IcebergSuite,
VeloxIcebergAppendDataExec, VeloxIcebergOverwriteByExpressionExec,
VeloxIcebergReplaceDataExec}
+import org.apache.gluten.execution.{ColumnarToRowExecBase, IcebergSuite,
VeloxIcebergAppendDataExec, VeloxIcebergOverwriteByExpressionExec,
VeloxIcebergOverwritePartitionsDynamicExec, VeloxIcebergReplaceDataExec}
import org.apache.gluten.tags.EnhancedFeaturesTest
import org.apache.spark.sql.{DataFrame, Row}
@@ -259,4 +259,28 @@ class VeloxIcebergSuite extends IcebergSuite {
checkColumnarToRow(df, 0)
}
}
+
+ test("iceberg dynamic insert overwrite partition") {
+ withTable("iceberg_tbl") {
+ spark.sql("""
+ |create table if not exists iceberg_tbl (a int, pt int)
using iceberg
+ |partitioned by (pt)
+ |""".stripMargin)
+
+ spark.sql("insert into table iceberg_tbl values (1, 1), (2, 2)")
+
+ withSQLConf("spark.sql.sources.partitionOverwriteMode" -> "dynamic") {
+ val df = spark.sql("insert overwrite table iceberg_tbl values (11, 1)")
+ assert(
+ df.queryExecution.executedPlan
+ .asInstanceOf[CommandResultExec]
+ .commandPhysicalPlan
+ .isInstanceOf[VeloxIcebergOverwritePartitionsDynamicExec])
+ checkAnswer(
+ spark.sql("select * from iceberg_tbl order by pt"),
+ Seq(Row(11, 1), Row(2, 2))
+ )
+ }
+ }
+ }
}
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
index a410f15f89..3967ab68d4 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
@@ -551,12 +551,11 @@ object VeloxBackendSettings extends BackendSettingsApi {
override def enableEnhancedFeatures(): Boolean =
VeloxConfig.get.enableEnhancedFeatures()
- override def supportAppendDataExec(): Boolean =
- GlutenConfig.get.enableAppendData && enableEnhancedFeatures()
+ override def supportAppendDataExec(): Boolean = enableEnhancedFeatures()
- override def supportReplaceDataExec(): Boolean =
- GlutenConfig.get.enableReplaceData && enableEnhancedFeatures()
+ override def supportReplaceDataExec(): Boolean = enableEnhancedFeatures()
- override def supportOverwriteByExpression(): Boolean =
- GlutenConfig.get.enableOverwriteByExpression && enableEnhancedFeatures()
+ override def supportOverwriteByExpression(): Boolean =
enableEnhancedFeatures()
+
+ override def supportOverwritePartitionsDynamic(): Boolean =
enableEnhancedFeatures()
}
diff --git a/docs/Configuration.md b/docs/Configuration.md
index 9aacdb3777..c3d0c396fd 100644
--- a/docs/Configuration.md
+++ b/docs/Configuration.md
@@ -74,6 +74,7 @@ nav_order: 15
| spark.gluten.sql.columnar.limit | true
|
| spark.gluten.sql.columnar.maxBatchSize | 4096
|
| spark.gluten.sql.columnar.overwriteByExpression | true
| Enable or disable columnar v2 command overwrite by expression.
|
+| spark.gluten.sql.columnar.overwritePartitionsDynamic | true
| Enable or disable columnar v2 command overwrite partitions dynamic.
|
| spark.gluten.sql.columnar.parquet.write.blockSize | 128MB
|
| spark.gluten.sql.columnar.partial.generate | true
| Evaluates the non-offload-able HiveUDTF using vanilla Spark
generator
|
| spark.gluten.sql.columnar.partial.project | true
| Break up one project node into 2 phases when some of the
expressions are non offload-able. Phase one is a regular offloaded project
transformer that evaluates the offload-able expressions in native, phase two
preserves the output from phase one and evaluates the remaining
non-offload-able expressions using vanilla Spark projections
|
diff --git
a/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergWriteExec.scala
b/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergWriteExec.scala
index 5565ccbf35..a3346468df 100644
---
a/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergWriteExec.scala
+++
b/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergWriteExec.scala
@@ -66,8 +66,9 @@ trait IcebergWriteExec extends ColumnarV2TableWriteExec {
if (IcebergWriteUtil.hasUnsupportedDataType(write)) {
return ValidationResult.failed("Contains UUID ot FIXED data type")
}
- if
(BackendsApiManager.getValidatorApiInstance.doSchemaValidate(query.schema).isDefined)
{
- return ValidationResult.failed("Contains unsupported data type")
+ BackendsApiManager.getValidatorApiInstance.doSchemaValidate(query.schema)
match {
+ case Some(reason) => return ValidationResult.failed(reason)
+ case None =>
}
val spec = IcebergWriteUtil.getTable(write).spec()
if (spec.isPartitioned) {
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
index 72caa1203e..eea34e8c44 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
@@ -161,4 +161,6 @@ trait BackendSettingsApi {
def supportReplaceDataExec(): Boolean = false
def supportOverwriteByExpression(): Boolean = false
+
+ def supportOverwritePartitionsDynamic(): Boolean = false
}
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
index 6e686210ef..aeecbdb198 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/config/GlutenConfig.scala
@@ -90,6 +90,9 @@ class GlutenConfig(conf: SQLConf) extends
GlutenCoreConfig(conf) {
def enableOverwriteByExpression: Boolean =
getConf(COLUMNAR_OVERWRIET_BY_EXPRESSION_ENABLED)
+ def enableOverwritePartitionsDynamic: Boolean =
+ getConf(COLUMNAR_OVERWRIET_PARTITIONS_DYNAMIC_ENABLED)
+
def enableColumnarShuffledHashJoin: Boolean =
getConf(COLUMNAR_SHUFFLED_HASH_JOIN_ENABLED)
def shuffledHashJoinOptimizeBuildSide: Boolean =
@@ -864,6 +867,12 @@ object GlutenConfig {
.booleanConf
.createWithDefault(true)
+ val COLUMNAR_OVERWRIET_PARTITIONS_DYNAMIC_ENABLED =
+ buildConf("spark.gluten.sql.columnar.overwriteOverwritePartitionsDynamic")
+ .doc("Enable or disable columnar v2 command overwrite partitions
dynamic.")
+ .booleanConf
+ .createWithDefault(true)
+
val COLUMNAR_PREFER_STREAMING_AGGREGATE =
buildConf("spark.gluten.sql.columnar.preferStreamingAggregate")
.doc(
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
index 2eda0ee12c..14d4a7ffcf 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
@@ -28,7 +28,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.aggregate.{HashAggregateExec,
ObjectHashAggregateExec, SortAggregateExec}
import org.apache.spark.sql.execution.datasources.WriteFilesExec
-import org.apache.spark.sql.execution.datasources.v2.{AppendDataExec,
BatchScanExec, OverwriteByExpressionExec, ReplaceDataExec}
+import org.apache.spark.sql.execution.datasources.v2.{AppendDataExec,
BatchScanExec, OverwriteByExpressionExec, OverwritePartitionsDynamicExec,
ReplaceDataExec}
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec,
ShuffleExchangeExec}
import org.apache.spark.sql.execution.joins._
import org.apache.spark.sql.execution.window.WindowExec
@@ -138,6 +138,8 @@ object Validators {
case p: AppendDataExec if !settings.supportAppendDataExec() => fail(p)
case p: ReplaceDataExec if !settings.supportReplaceDataExec() => fail(p)
case p: OverwriteByExpressionExec if
!settings.supportOverwriteByExpression() => fail(p)
+ case p: OverwritePartitionsDynamicExec if
!settings.supportOverwritePartitionsDynamic() =>
+ fail(p)
case _ => pass()
}
}
@@ -160,6 +162,8 @@ object Validators {
case p: AppendDataExec if !glutenConf.enableAppendData => fail(p)
case p: ReplaceDataExec if !glutenConf.enableReplaceData => fail(p)
case p: OverwriteByExpressionExec if
!glutenConf.enableOverwriteByExpression => fail(p)
+ case p: OverwritePartitionsDynamicExec if
!glutenConf.enableOverwritePartitionsDynamic =>
+ fail(p)
case p @ (_: LocalLimitExec | _: GlobalLimitExec) if
!glutenConf.enableColumnarLimit =>
fail(p)
case p: GenerateExec if !glutenConf.enableColumnarGenerate => fail(p)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]