This is an automated email from the ASF dual-hosted git repository.
philo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new c13833e3a9 [VL] Validate Iceberg write support before executing
offload (#10900)
c13833e3a9 is described below
commit c13833e3a9611a9af733069a4f299a0798b1ed8d
Author: Zouxxyy <[email protected]>
AuthorDate: Thu Oct 23 11:05:24 2025 +0800
[VL] Validate Iceberg write support before executing offload (#10900)
---
.../org/apache/gluten/extension/OffloadIcebergWrite.scala | 10 ++++++----
.../main/scala/org/apache/spark/util/SparkReflectionUtil.scala | 9 +++++++++
.../scala/org/apache/gluten/execution/IcebergWriteExec.scala | 2 +-
.../org/apache/iceberg/spark/source/IcebergWriteUtil.scala | 5 +++--
4 files changed, 19 insertions(+), 7 deletions(-)
diff --git
a/backends-velox/src-iceberg/main/scala/org/apache/gluten/extension/OffloadIcebergWrite.scala
b/backends-velox/src-iceberg/main/scala/org/apache/gluten/extension/OffloadIcebergWrite.scala
index 67b701f771..1b00c1a788 100644
---
a/backends-velox/src-iceberg/main/scala/org/apache/gluten/extension/OffloadIcebergWrite.scala
+++
b/backends-velox/src-iceberg/main/scala/org/apache/gluten/extension/OffloadIcebergWrite.scala
@@ -27,9 +27,11 @@ import org.apache.gluten.extension.injector.Injector
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.datasources.v2.{AppendDataExec,
OverwriteByExpressionExec, OverwritePartitionsDynamicExec, ReplaceDataExec}
+import org.apache.iceberg.spark.source.IcebergWriteUtil.supportsWrite
+
case class OffloadIcebergAppend() extends OffloadSingleNode {
override def offload(plan: SparkPlan): SparkPlan = plan match {
- case a: AppendDataExec =>
+ case a: AppendDataExec if supportsWrite(a.write) =>
VeloxIcebergAppendDataExec(a)
case other => other
}
@@ -37,7 +39,7 @@ case class OffloadIcebergAppend() extends OffloadSingleNode {
case class OffloadIcebergReplaceData() extends OffloadSingleNode {
override def offload(plan: SparkPlan): SparkPlan = plan match {
- case r: ReplaceDataExec =>
+ case r: ReplaceDataExec if supportsWrite(r.write) =>
VeloxIcebergReplaceDataExec(r)
case other => other
}
@@ -45,7 +47,7 @@ case class OffloadIcebergReplaceData() extends
OffloadSingleNode {
case class OffloadIcebergOverwrite() extends OffloadSingleNode {
override def offload(plan: SparkPlan): SparkPlan = plan match {
- case r: OverwriteByExpressionExec =>
+ case r: OverwriteByExpressionExec if supportsWrite(r.write) =>
VeloxIcebergOverwriteByExpressionExec(r)
case other => other
}
@@ -53,7 +55,7 @@ case class OffloadIcebergOverwrite() extends
OffloadSingleNode {
case class OffloadIcebergOverwritePartitionsDynamic() extends
OffloadSingleNode {
override def offload(plan: SparkPlan): SparkPlan = plan match {
- case r: OverwritePartitionsDynamicExec =>
+ case r: OverwritePartitionsDynamicExec if supportsWrite(r.write) =>
VeloxIcebergOverwritePartitionsDynamicExec(r)
case other => other
}
diff --git
a/gluten-core/src/main/scala/org/apache/spark/util/SparkReflectionUtil.scala
b/gluten-core/src/main/scala/org/apache/spark/util/SparkReflectionUtil.scala
index ed243abb7f..60a15c8be9 100644
--- a/gluten-core/src/main/scala/org/apache/spark/util/SparkReflectionUtil.scala
+++ b/gluten-core/src/main/scala/org/apache/spark/util/SparkReflectionUtil.scala
@@ -27,4 +27,13 @@ object SparkReflectionUtil {
noSparkClassLoader: Boolean = false): Class[C] = {
Utils.classForName(className, initialize, noSparkClassLoader)
}
+
+ def isInstanceOfClassName(obj: Any, className: String): Boolean = {
+ try {
+ val cls = classForName(className)
+ cls.isInstance(obj)
+ } catch {
+ case _: ClassNotFoundException => false
+ }
+ }
}
diff --git
a/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergWriteExec.scala
b/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergWriteExec.scala
index f06eda3c14..0d283b7556 100644
---
a/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergWriteExec.scala
+++
b/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergWriteExec.scala
@@ -60,7 +60,7 @@ trait IcebergWriteExec extends ColumnarV2TableWriteExec {
}
override def doValidateInternal(): ValidationResult = {
- if (!IcebergWriteUtil.isDataWrite(write)) {
+ if (!IcebergWriteUtil.supportsWrite(write)) {
return ValidationResult.failed(s"Not support the write
${write.getClass.getSimpleName}")
}
if (IcebergWriteUtil.hasUnsupportedDataType(write)) {
diff --git
a/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/IcebergWriteUtil.scala
b/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/IcebergWriteUtil.scala
index 12507af996..1d15259732 100644
---
a/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/IcebergWriteUtil.scala
+++
b/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/IcebergWriteUtil.scala
@@ -17,6 +17,7 @@
package org.apache.iceberg.spark.source
import org.apache.spark.sql.connector.write.{Write, WriterCommitMessage}
+import org.apache.spark.util.SparkReflectionUtil
import org.apache.iceberg._
import org.apache.iceberg.spark.SparkWriteConf
@@ -27,8 +28,8 @@ import org.apache.iceberg.types.Types.{ListType, MapType}
object IcebergWriteUtil {
- def isDataWrite(write: Write): Boolean = {
- write.isInstanceOf[SparkWrite]
+ def supportsWrite(write: Write): Boolean = {
+ SparkReflectionUtil.isInstanceOfClassName(write,
"org.apache.iceberg.spark.source.SparkWrite")
}
def hasUnsupportedDataType(write: Write): Boolean = {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]