This is an automated email from the ASF dual-hosted git repository.

felixybw pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 798ddcc541 [CORE][UI] Fix fallback info for V2 writes and align plan 
with Spark SQL tab (#11853)
798ddcc541 is described below

commit 798ddcc5415e6fd480ea198270387c392d9ed62d
Author: Zouxxyy <[email protected]>
AuthorDate: Fri Apr 10 07:30:45 2026 +0800

    [CORE][UI] Fix fallback info for V2 writes and align plan with Spark SQL 
tab (#11853)
    
    Fix Gluten UI not displaying fallback info for V2 writes (e.g., Iceberg 
INSERT with sort order), and align the plan display with Spark SQL tab 
(consistent operator IDs, complete plan tree).
---
 .../execution/enhanced/VeloxIcebergSuite.scala     | 27 ++++++++
 .../apache/gluten/execution/FallbackSuite.scala    |  2 +-
 .../gluten/backendsapi/SubstraitBackend.scala      |  2 +
 .../spark/sql/execution/GlutenExplainUtils.scala   | 17 ++++-
 .../sql/execution/GlutenFallbackReporter.scala     | 31 +++++----
 .../spark/sql/execution/GlutenImplicits.scala      |  6 +-
 .../execution/GlutenQueryExecutionListener.scala   | 80 ++++++++++++++++++++++
 .../execution/ui/GlutenSQLAppStatusListener.scala  |  9 ++-
 .../spark/sql/gluten/GlutenFallbackSuite.scala     |  2 +-
 .../spark/sql/gluten/GlutenFallbackSuite.scala     |  2 +-
 .../spark/sql/gluten/GlutenFallbackSuite.scala     |  2 +-
 .../spark/sql/gluten/GlutenFallbackSuite.scala     |  2 +-
 .../spark/sql/gluten/GlutenFallbackSuite.scala     |  2 +-
 13 files changed, 161 insertions(+), 23 deletions(-)

diff --git 
a/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala
 
b/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala
index 888ccf6ec0..a51f2d7717 100644
--- 
a/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala
+++ 
b/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala
@@ -21,6 +21,8 @@ import org.apache.gluten.tags.EnhancedFeaturesTest
 
 import org.apache.spark.sql.{DataFrame, Row}
 import org.apache.spark.sql.execution.CommandResultExec
+import org.apache.spark.sql.execution.GlutenImplicits._
+import org.apache.spark.sql.execution.datasources.v2.AppendDataExec
 import org.apache.spark.sql.execution.streaming.MemoryStream
 import org.apache.spark.sql.gluten.TestUtils
 
@@ -383,4 +385,29 @@ class VeloxIcebergSuite extends IcebergSuite {
       }
     }
   }
+
+  test("iceberg native write fallback when validation fails - sort order") {
+    withTable("iceberg_sorted_tbl") {
+      spark.sql("CREATE TABLE iceberg_sorted_tbl (a INT, b STRING) USING 
iceberg")
+      spark.sql("ALTER TABLE iceberg_sorted_tbl WRITE ORDERED BY a")
+
+      val df = spark.sql("INSERT INTO iceberg_sorted_tbl VALUES (1, 'hello'), 
(2, 'world')")
+
+      // Should fallback to vanilla Spark's AppendDataExec.
+      val commandPlan =
+        
df.queryExecution.executedPlan.asInstanceOf[CommandResultExec].commandPhysicalPlan
+      assert(commandPlan.isInstanceOf[AppendDataExec])
+      assert(!commandPlan.isInstanceOf[VeloxIcebergAppendDataExec])
+
+      checkAnswer(
+        spark.sql("SELECT * FROM iceberg_sorted_tbl ORDER BY a"),
+        Seq(Row(1, "hello"), Row(2, "world")))
+
+      // Verify fallbackSummary reports the sort order fallback reason.
+      val summary = df.fallbackSummary()
+      assert(
+        summary.fallbackNodeToReason.exists(
+          _.values.exists(_.contains("Not support write table with sort 
order"))))
+    }
+  }
 }
diff --git 
a/backends-velox/src/test/scala/org/apache/gluten/execution/FallbackSuite.scala 
b/backends-velox/src/test/scala/org/apache/gluten/execution/FallbackSuite.scala
index b06db41fe2..b77c8c2f08 100644
--- 
a/backends-velox/src/test/scala/org/apache/gluten/execution/FallbackSuite.scala
+++ 
b/backends-velox/src/test/scala/org/apache/gluten/execution/FallbackSuite.scala
@@ -345,7 +345,7 @@ class FallbackSuite extends VeloxWholeStageTransformerSuite 
with AdaptiveSparkPl
         val fallbackReasons = events.flatMap(_.fallbackNodeToReason.values)
         assert(fallbackReasons.nonEmpty)
         assert(
-          fallbackReasons.forall(
+          fallbackReasons.exists(
             _.contains("[FallbackByUserOptions] Validation failed on node 
Sort")))
       } finally {
         spark.sparkContext.removeSparkListener(listener)
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SubstraitBackend.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SubstraitBackend.scala
index 618210bbf0..1acf40cb43 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SubstraitBackend.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SubstraitBackend.scala
@@ -28,6 +28,7 @@ import org.apache.spark.{SparkConf, SparkContext}
 import org.apache.spark.api.plugin.PluginContext
 import org.apache.spark.internal.Logging
 import org.apache.spark.softaffinity.SoftAffinityListener
+import org.apache.spark.sql.execution.GlutenQueryExecutionListener
 import org.apache.spark.sql.execution.adaptive.GlutenCostEvaluator
 import org.apache.spark.sql.execution.ui.{GlutenSQLAppStatusListener, 
GlutenUIUtils}
 import org.apache.spark.sql.internal.SparkConfigUtil._
@@ -45,6 +46,7 @@ trait SubstraitBackend extends Backend with Logging {
 
     // Register Gluten listeners
     GlutenSQLAppStatusListener.register(sc)
+    GlutenQueryExecutionListener.register(sc)
     if (conf.get(GLUTEN_SOFT_AFFINITY_ENABLED)) {
       SoftAffinityListener.register(sc)
     }
diff --git 
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenExplainUtils.scala
 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenExplainUtils.scala
index 1e5beeb7c1..dd115efa54 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenExplainUtils.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenExplainUtils.scala
@@ -91,7 +91,11 @@ object GlutenExplainUtils extends AdaptiveSparkPlanHelper {
     def collect(tmp: QueryPlan[_]): Unit = {
       tmp.foreachUp {
         case _: ExecutedCommandExec =>
-        case _: CommandResultExec =>
+        case cmd: CommandResultExec => collect(cmd.commandPhysicalPlan)
+        case p: V2CommandExec
+            if FallbackTags.nonEmpty(p) ||
+              p.logicalLink.exists(FallbackTags.getOption(_).nonEmpty) =>
+          handleVanillaSparkPlan(p, fallbackNodeToReason)
         case _: V2CommandExec =>
         case _: DataWritingCommandExec =>
         case _: WholeStageCodegenExec =>
@@ -307,6 +311,14 @@ object GlutenExplainUtils extends AdaptiveSparkPlanHelper {
     plan.foreachUp {
       case _: WholeStageCodegenExec =>
       case _: InputAdapter =>
+      case cmd: CommandResultExec =>
+        currentOperationID = generateOperatorIDs(
+          cmd.commandPhysicalPlan,
+          currentOperationID,
+          visited,
+          reusedExchanges,
+          addReusedExchanges)
+        setOpId(cmd)
       case p: AdaptiveSparkPlanExec =>
         currentOperationID = generateOperatorIDs(
           p.executedPlan,
@@ -353,6 +365,8 @@ object GlutenExplainUtils extends AdaptiveSparkPlanHelper {
         getSubqueries(a.executedPlan, subqueries)
       case q: QueryStageExec =>
         getSubqueries(q.plan, subqueries)
+      case cmd: CommandResultExec =>
+        getSubqueries(cmd.commandPhysicalPlan, subqueries)
       case p: SparkPlan =>
         p.expressions.foreach(_.collect {
           case e: PlanExpression[_] =>
@@ -383,6 +397,7 @@ object GlutenExplainUtils extends AdaptiveSparkPlanHelper {
     plan.foreach {
       case p: AdaptiveSparkPlanExec => remove(p, Seq(p.executedPlan, 
p.initialPlan))
       case p: QueryStageExec => remove(p, Seq(p.plan))
+      case cmd: CommandResultExec => remove(cmd, Seq(cmd.commandPhysicalPlan))
       case plan: QueryPlan[_] => remove(plan, plan.innerChildren)
     }
   }
diff --git 
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenFallbackReporter.scala
 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenFallbackReporter.scala
index 99c34122d2..0ef0b6a28c 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenFallbackReporter.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenFallbackReporter.scala
@@ -55,19 +55,24 @@ case class GlutenFallbackReporter(glutenConf: GlutenConfig, 
spark: SparkSession)
 
   private def printFallbackReason(plan: SparkPlan): Unit = {
     val validationLogLevel = glutenConf.validationLogLevel
-    plan.foreachUp {
-      case _: GlutenPlan => // ignore
-      case p: SparkPlan if FallbackTags.nonEmpty(p) =>
-        val tag = FallbackTags.get(p)
-        logFallbackReason(validationLogLevel, p.nodeName, tag.reason())
-        // With in next round stage in AQE, the physical plan would be a new 
instance that
-        // can not preserve the tag, so we need to set the fallback reason to 
logical plan.
-        // Then we can be aware of the fallback reason for the whole plan.
-        // If a logical plan mapping to several physical plan, we add all 
reason into
-        // that logical plan to make sure we do not lose any fallback reason.
-        p.logicalLink.foreach(logicalPlan => FallbackTags.add(logicalPlan, 
tag))
-      case _ =>
+    def printPlan(p: SparkPlan): Unit = {
+      p.foreachUp {
+        case _: GlutenPlan => // ignore
+        case cmd: CommandResultExec =>
+          printPlan(cmd.commandPhysicalPlan)
+        case p: SparkPlan if FallbackTags.nonEmpty(p) =>
+          val tag = FallbackTags.get(p)
+          logFallbackReason(validationLogLevel, p.nodeName, tag.reason())
+          // With in next round stage in AQE, the physical plan would be a new 
instance that
+          // can not preserve the tag, so we need to set the fallback reason 
to logical plan.
+          // Then we can be aware of the fallback reason for the whole plan.
+          // If a logical plan mapping to several physical plan, we add all 
reason into
+          // that logical plan to make sure we do not lose any fallback reason.
+          p.logicalLink.foreach(logicalPlan => FallbackTags.add(logicalPlan, 
tag))
+        case _ =>
+      }
     }
+    printPlan(plan)
   }
 
   private def postFallbackReason(plan: SparkPlan): Unit = {
@@ -91,5 +96,3 @@ case class GlutenFallbackReporter(glutenConf: GlutenConfig, 
spark: SparkSession)
     GlutenUIUtils.postEvent(sc, event)
   }
 }
-
-object GlutenFallbackReporter {}
diff --git 
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenImplicits.scala
 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenImplicits.scala
index 48710a9edf..196de672f7 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenImplicits.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenImplicits.scala
@@ -18,6 +18,7 @@ package org.apache.spark.sql.execution
 
 import org.apache.gluten.exception.GlutenException
 import org.apache.gluten.execution.{GlutenPlan, WholeStageTransformer}
+import org.apache.gluten.extension.columnar.FallbackTags
 import org.apache.gluten.sql.shims.SparkShimLoader
 import org.apache.gluten.utils.PlanUtil
 import org.apache.spark.sql.{Column, Dataset, SparkSession}
@@ -107,7 +108,10 @@ object GlutenImplicits {
     def collect(tmp: QueryPlan[_]): Unit = {
       tmp.foreachUp {
         case _: ExecutedCommandExec =>
-        case _: CommandResultExec =>
+        case cmd: CommandResultExec => collect(cmd.commandPhysicalPlan)
+        case p: V2CommandExec if FallbackTags.nonEmpty(p) ||
+            p.logicalLink.exists(FallbackTags.getOption(_).nonEmpty) =>
+          GlutenExplainUtils.handleVanillaSparkPlan(p, fallbackNodeToReason)
         case _: V2CommandExec =>
         case _: DataWritingCommandExec =>
         case _: WholeStageCodegenExec =>
diff --git 
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenQueryExecutionListener.scala
 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenQueryExecutionListener.scala
new file mode 100644
index 0000000000..32f7ef2cd0
--- /dev/null
+++ 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenQueryExecutionListener.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution
+
+import org.apache.gluten.events.GlutenPlanFallbackEvent
+
+import org.apache.spark.SparkContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent}
+import org.apache.spark.sql.execution.ui.{GlutenUIUtils, 
SparkListenerSQLExecutionEnd}
+
+/** A SparkListener that generates complete Gluten UI data after query 
execution completes. */
+class GlutenQueryExecutionListener(sc: SparkContext) extends SparkListener 
with Logging {
+
+  override def onOtherEvent(event: SparkListenerEvent): Unit = event match {
+    case e: SparkListenerSQLExecutionEnd =>
+      onSQLExecutionEnd(e)
+    case _ =>
+  }
+
+  private def onSQLExecutionEnd(event: SparkListenerSQLExecutionEnd): Unit = {
+    try {
+      val qe = event.qe
+      if (qe == null) {
+        // History Server replay or edge case. Rely on per-stage events 
already in event log.
+        return
+      }
+
+      val summary =
+        GlutenImplicits.collectQueryExecutionFallbackSummary(qe.sparkSession, 
qe)
+
+      // Combine plan descriptions and fallback reasons from all segments.
+      val planStringBuilder = new StringBuilder()
+      planStringBuilder.append("== Physical Plan ==\n")
+      summary.physicalPlanDescription.foreach(planStringBuilder.append)
+      val combinedFallbackReasons =
+        summary.fallbackNodeToReason.foldLeft(Map.empty[String, String])(_ ++ 
_)
+
+      // Post event to listener bus. The event is serialized to event log, so 
History Server
+      // can replay it. GlutenSQLAppStatusListener receives this event and 
writes to kvstore.
+      val fallbackEvent = GlutenPlanFallbackEvent(
+        event.executionId,
+        summary.numGlutenNodes,
+        combinedFallbackReasons.size,
+        planStringBuilder.toString(),
+        combinedFallbackReasons
+      )
+      GlutenUIUtils.postEvent(sc, fallbackEvent)
+    } catch {
+      case e: Exception =>
+        logWarning(
+          s"Failed to generate complete fallback data for execution 
${event.executionId}",
+          e)
+    }
+  }
+}
+
+object GlutenQueryExecutionListener {
+
+  /** Register the listener on the status queue. Should be called once during 
driver start. */
+  def register(sc: SparkContext): Unit = {
+    if (GlutenUIUtils.uiEnabled(sc)) {
+      sc.listenerBus.addToStatusQueue(new GlutenQueryExecutionListener(sc))
+    }
+  }
+}
diff --git 
a/gluten-ui/src/main/scala/org/apache/spark/sql/execution/ui/GlutenSQLAppStatusListener.scala
 
b/gluten-ui/src/main/scala/org/apache/spark/sql/execution/ui/GlutenSQLAppStatusListener.scala
index a2f608f64d..fe516f7300 100644
--- 
a/gluten-ui/src/main/scala/org/apache/spark/sql/execution/ui/GlutenSQLAppStatusListener.scala
+++ 
b/gluten-ui/src/main/scala/org/apache/spark/sql/execution/ui/GlutenSQLAppStatusListener.scala
@@ -42,7 +42,14 @@ private class GlutenSQLAppStatusListener(conf: SparkConf, 
kvstore: ElementTracki
   }
 
   private def onGlutenPlanFallback(event: GlutenPlanFallbackEvent): Unit = {
-    val description = executionIdToDescription.get(event.executionId)
+    // Resolve description: from memory cache, or from kvstore (for complete 
event after END).
+    val description = executionIdToDescription.get(event.executionId).orElse {
+      try {
+        Some(kvstore.read(classOf[GlutenSQLExecutionUIData], 
event.executionId).description)
+      } catch {
+        case _: NoSuchElementException => None
+      }
+    }
     if (description.isDefined) {
       val uiData = new GlutenSQLExecutionUIData(
         event.executionId,
diff --git 
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
 
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
index da6d58d34c..2da437c45d 100644
--- 
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
+++ 
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
@@ -233,7 +233,7 @@ class GlutenFallbackSuite extends GlutenSQLTestsTrait with 
AdaptiveSparkPlanHelp
             events.count(
               _.fallbackNodeToReason.values.toSet.exists(_.contains(
                 "Could not find a valid substrait mapping name for max"
-              ))) == 2)
+              ))) == 3)
         } finally {
           spark.sparkContext.removeSparkListener(listener)
         }
diff --git 
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
 
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
index df84620716..f4e6e8203b 100644
--- 
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
+++ 
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
@@ -224,7 +224,7 @@ class GlutenFallbackSuite extends GlutenSQLTestsTrait with 
AdaptiveSparkPlanHelp
             events.count(
               _.fallbackNodeToReason.values.toSet.exists(_.contains(
                 "Could not find a valid substrait mapping name for max"
-              ))) == 2)
+              ))) == 3)
         } finally {
           spark.sparkContext.removeSparkListener(listener)
         }
diff --git 
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
 
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
index df84620716..f4e6e8203b 100644
--- 
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
+++ 
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
@@ -224,7 +224,7 @@ class GlutenFallbackSuite extends GlutenSQLTestsTrait with 
AdaptiveSparkPlanHelp
             events.count(
               _.fallbackNodeToReason.values.toSet.exists(_.contains(
                 "Could not find a valid substrait mapping name for max"
-              ))) == 2)
+              ))) == 3)
         } finally {
           spark.sparkContext.removeSparkListener(listener)
         }
diff --git 
a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
 
b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
index df84620716..f4e6e8203b 100644
--- 
a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
+++ 
b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
@@ -224,7 +224,7 @@ class GlutenFallbackSuite extends GlutenSQLTestsTrait with 
AdaptiveSparkPlanHelp
             events.count(
               _.fallbackNodeToReason.values.toSet.exists(_.contains(
                 "Could not find a valid substrait mapping name for max"
-              ))) == 2)
+              ))) == 3)
         } finally {
           spark.sparkContext.removeSparkListener(listener)
         }
diff --git 
a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
 
b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
index df84620716..f4e6e8203b 100644
--- 
a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
+++ 
b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
@@ -224,7 +224,7 @@ class GlutenFallbackSuite extends GlutenSQLTestsTrait with 
AdaptiveSparkPlanHelp
             events.count(
               _.fallbackNodeToReason.values.toSet.exists(_.contains(
                 "Could not find a valid substrait mapping name for max"
-              ))) == 2)
+              ))) == 3)
         } finally {
           spark.sparkContext.removeSparkListener(listener)
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to