Zouxxyy commented on code in PR #10900:
URL: 
https://github.com/apache/incubator-gluten/pull/10900#discussion_r2435929668


##########
backends-velox/src-iceberg/main/scala/org/apache/gluten/extension/OffloadIcebergWrite.scala:
##########
@@ -18,48 +18,56 @@ package org.apache.gluten.extension
 
 import org.apache.gluten.config.GlutenConfig
 import org.apache.gluten.execution.{VeloxIcebergAppendDataExec, 
VeloxIcebergOverwriteByExpressionExec, 
VeloxIcebergOverwritePartitionsDynamicExec, VeloxIcebergReplaceDataExec}
+import org.apache.gluten.extension.OffloadIcebergWrite._
 import org.apache.gluten.extension.columnar.enumerated.RasOffload
 import org.apache.gluten.extension.columnar.heuristic.HeuristicTransform
 import org.apache.gluten.extension.columnar.offload.OffloadSingleNode
 import org.apache.gluten.extension.columnar.validator.Validators
 import org.apache.gluten.extension.injector.Injector
 
+import org.apache.spark.sql.connector.write.Write
 import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.execution.datasources.v2.{AppendDataExec, 
OverwriteByExpressionExec, OverwritePartitionsDynamicExec, ReplaceDataExec}
+import org.apache.spark.util.SparkReflectionUtil
 
 case class OffloadIcebergAppend() extends OffloadSingleNode {
   override def offload(plan: SparkPlan): SparkPlan = plan match {
-    case a: AppendDataExec =>
+    case a: AppendDataExec if supportsWrite(a.write) =>
       VeloxIcebergAppendDataExec(a)
     case other => other
   }
 }
 
 case class OffloadIcebergReplaceData() extends OffloadSingleNode {
   override def offload(plan: SparkPlan): SparkPlan = plan match {
-    case r: ReplaceDataExec =>
+    case r: ReplaceDataExec if supportsWrite(r.write) =>
       VeloxIcebergReplaceDataExec(r)
     case other => other
   }
 }
 
 case class OffloadIcebergOverwrite() extends OffloadSingleNode {
   override def offload(plan: SparkPlan): SparkPlan = plan match {
-    case r: OverwriteByExpressionExec =>
+    case r: OverwriteByExpressionExec if supportsWrite(r.write) =>
       VeloxIcebergOverwriteByExpressionExec(r)
     case other => other
   }
 }
 
 case class OffloadIcebergOverwritePartitionsDynamic() extends 
OffloadSingleNode {
   override def offload(plan: SparkPlan): SparkPlan = plan match {
-    case r: OverwritePartitionsDynamicExec =>
+    case r: OverwritePartitionsDynamicExec if supportsWrite(r.write) =>
       VeloxIcebergOverwritePartitionsDynamicExec(r)
     case other => other
   }
 }
 
 object OffloadIcebergWrite {
+
+  def supportsWrite(write: Write): Boolean = {

Review Comment:
   I added reflection here to ensure that this rule can still execute correctly 
when there's no Iceberg dependency. A similar approach is used in 
offloadBatchScan.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to