This is an automated email from the ASF dual-hosted git repository.
felixybw pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 798ddcc541 [CORE][UI] Fix fallback info for V2 writes and align plan
with Spark SQL tab (#11853)
798ddcc541 is described below
commit 798ddcc5415e6fd480ea198270387c392d9ed62d
Author: Zouxxyy <[email protected]>
AuthorDate: Fri Apr 10 07:30:45 2026 +0800
[CORE][UI] Fix fallback info for V2 writes and align plan with Spark SQL
tab (#11853)
Fix Gluten UI not displaying fallback info for V2 writes (e.g., Iceberg
INSERT with sort order), and align the plan display with Spark SQL tab
(consistent operator IDs, complete plan tree).
---
.../execution/enhanced/VeloxIcebergSuite.scala | 27 ++++++++
.../apache/gluten/execution/FallbackSuite.scala | 2 +-
.../gluten/backendsapi/SubstraitBackend.scala | 2 +
.../spark/sql/execution/GlutenExplainUtils.scala | 17 ++++-
.../sql/execution/GlutenFallbackReporter.scala | 31 +++++----
.../spark/sql/execution/GlutenImplicits.scala | 6 +-
.../execution/GlutenQueryExecutionListener.scala | 80 ++++++++++++++++++++++
.../execution/ui/GlutenSQLAppStatusListener.scala | 9 ++-
.../spark/sql/gluten/GlutenFallbackSuite.scala | 2 +-
.../spark/sql/gluten/GlutenFallbackSuite.scala | 2 +-
.../spark/sql/gluten/GlutenFallbackSuite.scala | 2 +-
.../spark/sql/gluten/GlutenFallbackSuite.scala | 2 +-
.../spark/sql/gluten/GlutenFallbackSuite.scala | 2 +-
13 files changed, 161 insertions(+), 23 deletions(-)
diff --git
a/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala
b/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala
index 888ccf6ec0..a51f2d7717 100644
---
a/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala
+++
b/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala
@@ -21,6 +21,8 @@ import org.apache.gluten.tags.EnhancedFeaturesTest
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.execution.CommandResultExec
+import org.apache.spark.sql.execution.GlutenImplicits._
+import org.apache.spark.sql.execution.datasources.v2.AppendDataExec
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.gluten.TestUtils
@@ -383,4 +385,29 @@ class VeloxIcebergSuite extends IcebergSuite {
}
}
}
+
+ test("iceberg native write fallback when validation fails - sort order") {
+ withTable("iceberg_sorted_tbl") {
+ spark.sql("CREATE TABLE iceberg_sorted_tbl (a INT, b STRING) USING
iceberg")
+ spark.sql("ALTER TABLE iceberg_sorted_tbl WRITE ORDERED BY a")
+
+ val df = spark.sql("INSERT INTO iceberg_sorted_tbl VALUES (1, 'hello'),
(2, 'world')")
+
+ // Should fallback to vanilla Spark's AppendDataExec.
+ val commandPlan =
+
df.queryExecution.executedPlan.asInstanceOf[CommandResultExec].commandPhysicalPlan
+ assert(commandPlan.isInstanceOf[AppendDataExec])
+ assert(!commandPlan.isInstanceOf[VeloxIcebergAppendDataExec])
+
+ checkAnswer(
+ spark.sql("SELECT * FROM iceberg_sorted_tbl ORDER BY a"),
+ Seq(Row(1, "hello"), Row(2, "world")))
+
+ // Verify fallbackSummary reports the sort order fallback reason.
+ val summary = df.fallbackSummary()
+ assert(
+ summary.fallbackNodeToReason.exists(
+ _.values.exists(_.contains("Not support write table with sort
order"))))
+ }
+ }
}
diff --git
a/backends-velox/src/test/scala/org/apache/gluten/execution/FallbackSuite.scala
b/backends-velox/src/test/scala/org/apache/gluten/execution/FallbackSuite.scala
index b06db41fe2..b77c8c2f08 100644
---
a/backends-velox/src/test/scala/org/apache/gluten/execution/FallbackSuite.scala
+++
b/backends-velox/src/test/scala/org/apache/gluten/execution/FallbackSuite.scala
@@ -345,7 +345,7 @@ class FallbackSuite extends VeloxWholeStageTransformerSuite
with AdaptiveSparkPl
val fallbackReasons = events.flatMap(_.fallbackNodeToReason.values)
assert(fallbackReasons.nonEmpty)
assert(
- fallbackReasons.forall(
+ fallbackReasons.exists(
_.contains("[FallbackByUserOptions] Validation failed on node
Sort")))
} finally {
spark.sparkContext.removeSparkListener(listener)
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SubstraitBackend.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SubstraitBackend.scala
index 618210bbf0..1acf40cb43 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SubstraitBackend.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SubstraitBackend.scala
@@ -28,6 +28,7 @@ import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.api.plugin.PluginContext
import org.apache.spark.internal.Logging
import org.apache.spark.softaffinity.SoftAffinityListener
+import org.apache.spark.sql.execution.GlutenQueryExecutionListener
import org.apache.spark.sql.execution.adaptive.GlutenCostEvaluator
import org.apache.spark.sql.execution.ui.{GlutenSQLAppStatusListener,
GlutenUIUtils}
import org.apache.spark.sql.internal.SparkConfigUtil._
@@ -45,6 +46,7 @@ trait SubstraitBackend extends Backend with Logging {
// Register Gluten listeners
GlutenSQLAppStatusListener.register(sc)
+ GlutenQueryExecutionListener.register(sc)
if (conf.get(GLUTEN_SOFT_AFFINITY_ENABLED)) {
SoftAffinityListener.register(sc)
}
diff --git
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenExplainUtils.scala
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenExplainUtils.scala
index 1e5beeb7c1..dd115efa54 100644
---
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenExplainUtils.scala
+++
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenExplainUtils.scala
@@ -91,7 +91,11 @@ object GlutenExplainUtils extends AdaptiveSparkPlanHelper {
def collect(tmp: QueryPlan[_]): Unit = {
tmp.foreachUp {
case _: ExecutedCommandExec =>
- case _: CommandResultExec =>
+ case cmd: CommandResultExec => collect(cmd.commandPhysicalPlan)
+ case p: V2CommandExec
+ if FallbackTags.nonEmpty(p) ||
+ p.logicalLink.exists(FallbackTags.getOption(_).nonEmpty) =>
+ handleVanillaSparkPlan(p, fallbackNodeToReason)
case _: V2CommandExec =>
case _: DataWritingCommandExec =>
case _: WholeStageCodegenExec =>
@@ -307,6 +311,14 @@ object GlutenExplainUtils extends AdaptiveSparkPlanHelper {
plan.foreachUp {
case _: WholeStageCodegenExec =>
case _: InputAdapter =>
+ case cmd: CommandResultExec =>
+ currentOperationID = generateOperatorIDs(
+ cmd.commandPhysicalPlan,
+ currentOperationID,
+ visited,
+ reusedExchanges,
+ addReusedExchanges)
+ setOpId(cmd)
case p: AdaptiveSparkPlanExec =>
currentOperationID = generateOperatorIDs(
p.executedPlan,
@@ -353,6 +365,8 @@ object GlutenExplainUtils extends AdaptiveSparkPlanHelper {
getSubqueries(a.executedPlan, subqueries)
case q: QueryStageExec =>
getSubqueries(q.plan, subqueries)
+ case cmd: CommandResultExec =>
+ getSubqueries(cmd.commandPhysicalPlan, subqueries)
case p: SparkPlan =>
p.expressions.foreach(_.collect {
case e: PlanExpression[_] =>
@@ -383,6 +397,7 @@ object GlutenExplainUtils extends AdaptiveSparkPlanHelper {
plan.foreach {
case p: AdaptiveSparkPlanExec => remove(p, Seq(p.executedPlan,
p.initialPlan))
case p: QueryStageExec => remove(p, Seq(p.plan))
+ case cmd: CommandResultExec => remove(cmd, Seq(cmd.commandPhysicalPlan))
case plan: QueryPlan[_] => remove(plan, plan.innerChildren)
}
}
diff --git
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenFallbackReporter.scala
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenFallbackReporter.scala
index 99c34122d2..0ef0b6a28c 100644
---
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenFallbackReporter.scala
+++
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenFallbackReporter.scala
@@ -55,19 +55,24 @@ case class GlutenFallbackReporter(glutenConf: GlutenConfig,
spark: SparkSession)
private def printFallbackReason(plan: SparkPlan): Unit = {
val validationLogLevel = glutenConf.validationLogLevel
- plan.foreachUp {
- case _: GlutenPlan => // ignore
- case p: SparkPlan if FallbackTags.nonEmpty(p) =>
- val tag = FallbackTags.get(p)
- logFallbackReason(validationLogLevel, p.nodeName, tag.reason())
- // With in next round stage in AQE, the physical plan would be a new
instance that
- // can not preserve the tag, so we need to set the fallback reason to
logical plan.
- // Then we can be aware of the fallback reason for the whole plan.
- // If a logical plan mapping to several physical plan, we add all
reason into
- // that logical plan to make sure we do not lose any fallback reason.
- p.logicalLink.foreach(logicalPlan => FallbackTags.add(logicalPlan,
tag))
- case _ =>
+ def printPlan(p: SparkPlan): Unit = {
+ p.foreachUp {
+ case _: GlutenPlan => // ignore
+ case cmd: CommandResultExec =>
+ printPlan(cmd.commandPhysicalPlan)
+ case p: SparkPlan if FallbackTags.nonEmpty(p) =>
+ val tag = FallbackTags.get(p)
+ logFallbackReason(validationLogLevel, p.nodeName, tag.reason())
+ // With in next round stage in AQE, the physical plan would be a new
instance that
+ // can not preserve the tag, so we need to set the fallback reason
to logical plan.
+ // Then we can be aware of the fallback reason for the whole plan.
+ // If a logical plan mapping to several physical plan, we add all
reason into
+ // that logical plan to make sure we do not lose any fallback reason.
+ p.logicalLink.foreach(logicalPlan => FallbackTags.add(logicalPlan,
tag))
+ case _ =>
+ }
}
+ printPlan(plan)
}
private def postFallbackReason(plan: SparkPlan): Unit = {
@@ -91,5 +96,3 @@ case class GlutenFallbackReporter(glutenConf: GlutenConfig,
spark: SparkSession)
GlutenUIUtils.postEvent(sc, event)
}
}
-
-object GlutenFallbackReporter {}
diff --git
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenImplicits.scala
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenImplicits.scala
index 48710a9edf..196de672f7 100644
---
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenImplicits.scala
+++
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenImplicits.scala
@@ -18,6 +18,7 @@ package org.apache.spark.sql.execution
import org.apache.gluten.exception.GlutenException
import org.apache.gluten.execution.{GlutenPlan, WholeStageTransformer}
+import org.apache.gluten.extension.columnar.FallbackTags
import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.gluten.utils.PlanUtil
import org.apache.spark.sql.{Column, Dataset, SparkSession}
@@ -107,7 +108,10 @@ object GlutenImplicits {
def collect(tmp: QueryPlan[_]): Unit = {
tmp.foreachUp {
case _: ExecutedCommandExec =>
- case _: CommandResultExec =>
+ case cmd: CommandResultExec => collect(cmd.commandPhysicalPlan)
+ case p: V2CommandExec if FallbackTags.nonEmpty(p) ||
+ p.logicalLink.exists(FallbackTags.getOption(_).nonEmpty) =>
+ GlutenExplainUtils.handleVanillaSparkPlan(p, fallbackNodeToReason)
case _: V2CommandExec =>
case _: DataWritingCommandExec =>
case _: WholeStageCodegenExec =>
diff --git
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenQueryExecutionListener.scala
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenQueryExecutionListener.scala
new file mode 100644
index 0000000000..32f7ef2cd0
--- /dev/null
+++
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenQueryExecutionListener.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution
+
+import org.apache.gluten.events.GlutenPlanFallbackEvent
+
+import org.apache.spark.SparkContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent}
+import org.apache.spark.sql.execution.ui.{GlutenUIUtils,
SparkListenerSQLExecutionEnd}
+
+/** A SparkListener that generates complete Gluten UI data after query
execution completes. */
+class GlutenQueryExecutionListener(sc: SparkContext) extends SparkListener
with Logging {
+
+ override def onOtherEvent(event: SparkListenerEvent): Unit = event match {
+ case e: SparkListenerSQLExecutionEnd =>
+ onSQLExecutionEnd(e)
+ case _ =>
+ }
+
+ private def onSQLExecutionEnd(event: SparkListenerSQLExecutionEnd): Unit = {
+ try {
+ val qe = event.qe
+ if (qe == null) {
+ // History Server replay or edge case. Rely on per-stage events
already in event log.
+ return
+ }
+
+ val summary =
+ GlutenImplicits.collectQueryExecutionFallbackSummary(qe.sparkSession,
qe)
+
+ // Combine plan descriptions and fallback reasons from all segments.
+ val planStringBuilder = new StringBuilder()
+ planStringBuilder.append("== Physical Plan ==\n")
+ summary.physicalPlanDescription.foreach(planStringBuilder.append)
+ val combinedFallbackReasons =
+ summary.fallbackNodeToReason.foldLeft(Map.empty[String, String])(_ ++
_)
+
+ // Post event to listener bus. The event is serialized to event log, so
History Server
+ // can replay it. GlutenSQLAppStatusListener receives this event and
writes to kvstore.
+ val fallbackEvent = GlutenPlanFallbackEvent(
+ event.executionId,
+ summary.numGlutenNodes,
+ combinedFallbackReasons.size,
+ planStringBuilder.toString(),
+ combinedFallbackReasons
+ )
+ GlutenUIUtils.postEvent(sc, fallbackEvent)
+ } catch {
+ case e: Exception =>
+ logWarning(
+ s"Failed to generate complete fallback data for execution
${event.executionId}",
+ e)
+ }
+ }
+}
+
+object GlutenQueryExecutionListener {
+
+ /** Register the listener on the status queue. Should be called once during
driver start. */
+ def register(sc: SparkContext): Unit = {
+ if (GlutenUIUtils.uiEnabled(sc)) {
+ sc.listenerBus.addToStatusQueue(new GlutenQueryExecutionListener(sc))
+ }
+ }
+}
diff --git
a/gluten-ui/src/main/scala/org/apache/spark/sql/execution/ui/GlutenSQLAppStatusListener.scala
b/gluten-ui/src/main/scala/org/apache/spark/sql/execution/ui/GlutenSQLAppStatusListener.scala
index a2f608f64d..fe516f7300 100644
---
a/gluten-ui/src/main/scala/org/apache/spark/sql/execution/ui/GlutenSQLAppStatusListener.scala
+++
b/gluten-ui/src/main/scala/org/apache/spark/sql/execution/ui/GlutenSQLAppStatusListener.scala
@@ -42,7 +42,14 @@ private class GlutenSQLAppStatusListener(conf: SparkConf,
kvstore: ElementTracki
}
private def onGlutenPlanFallback(event: GlutenPlanFallbackEvent): Unit = {
- val description = executionIdToDescription.get(event.executionId)
+ // Resolve description: from memory cache, or from kvstore (for complete
event after END).
+ val description = executionIdToDescription.get(event.executionId).orElse {
+ try {
+ Some(kvstore.read(classOf[GlutenSQLExecutionUIData],
event.executionId).description)
+ } catch {
+ case _: NoSuchElementException => None
+ }
+ }
if (description.isDefined) {
val uiData = new GlutenSQLExecutionUIData(
event.executionId,
diff --git
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
index da6d58d34c..2da437c45d 100644
---
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
+++
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
@@ -233,7 +233,7 @@ class GlutenFallbackSuite extends GlutenSQLTestsTrait with
AdaptiveSparkPlanHelp
events.count(
_.fallbackNodeToReason.values.toSet.exists(_.contains(
"Could not find a valid substrait mapping name for max"
- ))) == 2)
+ ))) == 3)
} finally {
spark.sparkContext.removeSparkListener(listener)
}
diff --git
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
index df84620716..f4e6e8203b 100644
---
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
+++
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
@@ -224,7 +224,7 @@ class GlutenFallbackSuite extends GlutenSQLTestsTrait with
AdaptiveSparkPlanHelp
events.count(
_.fallbackNodeToReason.values.toSet.exists(_.contains(
"Could not find a valid substrait mapping name for max"
- ))) == 2)
+ ))) == 3)
} finally {
spark.sparkContext.removeSparkListener(listener)
}
diff --git
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
index df84620716..f4e6e8203b 100644
---
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
+++
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
@@ -224,7 +224,7 @@ class GlutenFallbackSuite extends GlutenSQLTestsTrait with
AdaptiveSparkPlanHelp
events.count(
_.fallbackNodeToReason.values.toSet.exists(_.contains(
"Could not find a valid substrait mapping name for max"
- ))) == 2)
+ ))) == 3)
} finally {
spark.sparkContext.removeSparkListener(listener)
}
diff --git
a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
index df84620716..f4e6e8203b 100644
---
a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
+++
b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
@@ -224,7 +224,7 @@ class GlutenFallbackSuite extends GlutenSQLTestsTrait with
AdaptiveSparkPlanHelp
events.count(
_.fallbackNodeToReason.values.toSet.exists(_.contains(
"Could not find a valid substrait mapping name for max"
- ))) == 2)
+ ))) == 3)
} finally {
spark.sparkContext.removeSparkListener(listener)
}
diff --git
a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
index df84620716..f4e6e8203b 100644
---
a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
+++
b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
@@ -224,7 +224,7 @@ class GlutenFallbackSuite extends GlutenSQLTestsTrait with
AdaptiveSparkPlanHelp
events.count(
_.fallbackNodeToReason.values.toSet.exists(_.contains(
"Could not find a valid substrait mapping name for max"
- ))) == 2)
+ ))) == 3)
} finally {
spark.sparkContext.removeSparkListener(listener)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]