This is an automated email from the ASF dual-hosted git repository.

hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 2e27a52eea [GLUTEN-8580][CORE][Part-2] Don't validate project 
generated by PushDownInputFileExpression (#8585)
2e27a52eea is described below

commit 2e27a52eea32bca5c0f05494b8e3ed9da9ed5cbf
Author: Mingliang Zhu <[email protected]>
AuthorDate: Thu Jan 23 19:35:22 2025 +0800

    [GLUTEN-8580][CORE][Part-2] Don't validate project generated by 
PushDownInputFileExpression (#8585)
---
 .../columnar/heuristic/AddFallbackTags.scala          |  5 ++++-
 .../columnar/PushDownInputFileExpression.scala        | 19 +++++++++++++------
 2 files changed, 17 insertions(+), 7 deletions(-)

diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/AddFallbackTags.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/AddFallbackTags.scala
index b3458e1385..3ccce3b46c 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/AddFallbackTags.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/AddFallbackTags.scala
@@ -25,7 +25,10 @@ import org.apache.spark.sql.execution.SparkPlan
 // Add fallback tags when validator returns negative outcome.
 case class AddFallbackTags(validator: Validator) extends Rule[SparkPlan] {
   def apply(plan: SparkPlan): SparkPlan = {
-    plan.foreachUp { case p => addFallbackTag(p) }
+    plan.foreachUp {
+      case p if FallbackTags.maybeOffloadable(p) => addFallbackTag(p)
+      case _ =>
+    }
     plan
   }
 
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/PushDownInputFileExpression.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/PushDownInputFileExpression.scala
index e92ffb9438..778bd62b6d 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/PushDownInputFileExpression.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/PushDownInputFileExpression.scala
@@ -46,6 +46,11 @@ object PushDownInputFileExpression {
     }
   }
 
+  def addFallbackTag(plan: SparkPlan): SparkPlan = {
+    FallbackTags.add(plan, "fallback input file expression")
+    plan
+  }
+
   object PreOffload extends Rule[SparkPlan] {
     override def apply(plan: SparkPlan): SparkPlan = plan.transformUp {
       case ProjectExec(projectList, child) if 
projectList.exists(containsInputFileRelatedExpr) =>
@@ -82,11 +87,11 @@ object PushDownInputFileExpression {
         replacedExprs: mutable.Map[String, Alias]): SparkPlan =
       plan match {
         case p: LeafExecNode =>
-          ProjectExec(p.output ++ replacedExprs.values, p)
+          addFallbackTag(ProjectExec(p.output ++ replacedExprs.values, p))
         // Output of SerializeFromObjectExec's child and output of 
DeserializeToObjectExec must be
         // a single-field row.
         case p @ (_: SerializeFromObjectExec | _: DeserializeToObjectExec) =>
-          ProjectExec(p.output ++ replacedExprs.values, p)
+          addFallbackTag(ProjectExec(p.output ++ replacedExprs.values, p))
         case p: ProjectExec =>
           p.copy(
             projectList = p.projectList ++ 
replacedExprs.values.toSeq.map(_.toAttribute),
@@ -115,11 +120,13 @@ object PushDownInputFileExpression {
           if projectList.exists(containsInputFileRelatedExpr) =>
         child.copy(output = p.output.asInstanceOf[Seq[AttributeReference]])
       case p1 @ ProjectExec(_, p2: ProjectExec) if canCollapseProject(p2) =>
-        p2.copy(projectList =
-          CollapseProjectShim.buildCleanedProjectList(p1.projectList, 
p2.projectList))
+        addFallbackTag(
+          p2.copy(projectList =
+            CollapseProjectShim.buildCleanedProjectList(p1.projectList, 
p2.projectList)))
       case p1 @ ProjectExecTransformer(_, p2: ProjectExec) if 
canCollapseProject(p1, p2) =>
-        p2.copy(projectList =
-          CollapseProjectShim.buildCleanedProjectList(p1.projectList, 
p2.projectList))
+        addFallbackTag(
+          p2.copy(projectList =
+            CollapseProjectShim.buildCleanedProjectList(p1.projectList, 
p2.projectList)))
     }
 
     private def canCollapseProject(project: ProjectExec): Boolean = {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to