This is an automated email from the ASF dual-hosted git repository.
liujiayi771 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 0342cdb222 [CORE] Deduplicate fallback reason when merging Appendable
tags (#12049)
0342cdb222 is described below
commit 0342cdb222967890ba102800a80e946e79a38985
Author: Joey <[email protected]>
AuthorDate: Mon May 11 21:43:17 2026 +0800
[CORE] Deduplicate fallback reason when merging Appendable tags (#12049)
---
.../apache/gluten/execution/FallbackSuite.scala | 144 ++++++++-------------
.../org/apache/spark/utils/GlutenSuiteUtils.scala | 25 ++++
.../gluten/extension/columnar/FallbackTag.scala | 16 ++-
.../extension/columnar/FallbackTagSuite.scala | 49 +++++++
4 files changed, 138 insertions(+), 96 deletions(-)
diff --git
a/backends-velox/src/test/scala/org/apache/gluten/execution/FallbackSuite.scala
b/backends-velox/src/test/scala/org/apache/gluten/execution/FallbackSuite.scala
index 421e0959f1..ddc9cc923e 100644
---
a/backends-velox/src/test/scala/org/apache/gluten/execution/FallbackSuite.scala
+++
b/backends-velox/src/test/scala/org/apache/gluten/execution/FallbackSuite.scala
@@ -17,18 +17,14 @@
package org.apache.gluten.execution
import org.apache.gluten.config.{GlutenConfig, VeloxConfig}
-import org.apache.gluten.events.GlutenPlanFallbackEvent
import org.apache.spark.SparkConf
-import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent}
import org.apache.spark.sql.execution.{ColumnarBroadcastExchangeExec,
ColumnarShuffleExchangeExec, SortExec, SparkPlan}
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper,
AQEShuffleReadExec}
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec,
BroadcastNestedLoopJoinExec, SortMergeJoinExec}
import org.apache.spark.utils.GlutenSuiteUtils
-import scala.collection.mutable.ArrayBuffer
-
class FallbackSuite extends VeloxWholeStageTransformerSuite with
AdaptiveSparkPlanHelper {
protected val rootPath: String = getClass.getResource("/").getPath
override protected val resourcePath: String = "/tpch-data-parquet"
@@ -315,80 +311,53 @@ class FallbackSuite extends
VeloxWholeStageTransformerSuite with AdaptiveSparkPl
}
test("get correct fallback reason on nodes without logicalLink") {
- val events = new ArrayBuffer[GlutenPlanFallbackEvent]
- val listener = new SparkListener {
- override def onOtherEvent(event: SparkListenerEvent): Unit = {
- event match {
- case e: GlutenPlanFallbackEvent => events.append(e)
- case _ =>
- }
+ withSQLConf(GlutenConfig.COLUMNAR_SORT_ENABLED.key -> "false") {
+ GlutenSuiteUtils.withFallbackEventListener(spark.sparkContext) {
+ events =>
+ val df = spark.sql("""
+ |SELECT
+ | c1,
+ | c2,
+ | ROW_NUMBER() OVER (PARTITION BY c1 ORDER BY
c2) as row_num,
+ | RANK() OVER (PARTITION BY c1 ORDER BY c2) as
rank_num
+ |FROM tmp1
+ |
+ |""".stripMargin)
+ df.collect()
+ GlutenSuiteUtils.waitUntilEmpty(spark.sparkContext)
+ val sort = find(df.queryExecution.executedPlan) {
+ _.isInstanceOf[SortExec]
+ }
+ assert(sort.isDefined)
+ val fallbackReasons = events.flatMap(_.fallbackNodeToReason.values)
+ assert(fallbackReasons.nonEmpty)
+ assert(
+ fallbackReasons.forall(
+ _.contains("[FallbackByUserOptions] Validation failed on node
Sort")))
}
}
- // Drain any pending events from previous tests before registering the
listener.
- // Spark's LiveListenerBus is async, so events posted but not yet
dispatched will
- // still be delivered to listeners added afterwards, contaminating
`events` here.
- GlutenSuiteUtils.waitUntilEmpty(spark.sparkContext)
- spark.sparkContext.addSparkListener(listener)
- withSQLConf(GlutenConfig.COLUMNAR_SORT_ENABLED.key -> "false") {
- try {
+ }
+
+ test("fallback when nested loop join has unsupported expression") {
+ GlutenSuiteUtils.withFallbackEventListener(spark.sparkContext) {
+ events =>
val df = spark.sql("""
- |SELECT
- | c1,
- | c2,
- | ROW_NUMBER() OVER (PARTITION BY c1 ORDER BY
c2) as row_num,
- | RANK() OVER (PARTITION BY c1 ORDER BY c2) as
rank_num
- |FROM tmp1
- |
+ |select tmp1.c1, tmp1.c2 from tmp1
+ |left join tmp2 on (
+ | tmp1.c1 = regexp_extract(tmp2.c1,
'(?<=@)[^.]+(?=\.)', 0)
+ | or tmp2.c1 > 10
+ |)
|""".stripMargin)
df.collect()
GlutenSuiteUtils.waitUntilEmpty(spark.sparkContext)
- val sort = find(df.queryExecution.executedPlan) {
- _.isInstanceOf[SortExec]
+
+ val nestedLoopJoin = find(df.queryExecution.executedPlan) {
+ _.isInstanceOf[BroadcastNestedLoopJoinExec]
}
- assert(sort.isDefined)
+ assert(nestedLoopJoin.isDefined)
val fallbackReasons = events.flatMap(_.fallbackNodeToReason.values)
assert(fallbackReasons.nonEmpty)
- assert(
- fallbackReasons.forall(
- _.contains("[FallbackByUserOptions] Validation failed on node
Sort")))
- } finally {
- spark.sparkContext.removeSparkListener(listener)
- }
- }
- }
-
- test("fallback when nested loop join has unsupported expression") {
- val events = new ArrayBuffer[GlutenPlanFallbackEvent]
- val listener = new SparkListener {
- override def onOtherEvent(event: SparkListenerEvent): Unit = {
- event match {
- case e: GlutenPlanFallbackEvent => events.append(e)
- case _ =>
- }
- }
- }
- spark.sparkContext.addSparkListener(listener)
-
- try {
- val df = spark.sql("""
- |select tmp1.c1, tmp1.c2 from tmp1
- |left join tmp2 on (
- | tmp1.c1 = regexp_extract(tmp2.c1,
'(?<=@)[^.]+(?=\.)', 0)
- | or tmp2.c1 > 10
- |)
- |""".stripMargin)
- df.collect()
- GlutenSuiteUtils.waitUntilEmpty(spark.sparkContext)
-
- val nestedLoopJoin = find(df.queryExecution.executedPlan) {
- _.isInstanceOf[BroadcastNestedLoopJoinExec]
- }
- assert(nestedLoopJoin.isDefined)
- val fallbackReasons = events.flatMap(_.fallbackNodeToReason.values)
- assert(fallbackReasons.nonEmpty)
- assert(fallbackReasons.forall(_.contains("regexp_extract due to
Pattern")))
- } finally {
- spark.sparkContext.removeSparkListener(listener)
+ assert(fallbackReasons.forall(_.contains("regexp_extract due to
Pattern")))
}
}
@@ -397,31 +366,20 @@ class FallbackSuite extends
VeloxWholeStageTransformerSuite with AdaptiveSparkPl
// GlutenPlanFallbackEvent even when spark.gluten.enabled=false (e.g. the
vanilla baseline run
// inside runQueryAndCompare). All nodes would appear as fallback with the
generic reason
// "Gluten does not touch it or does not support it".
- val events = new ArrayBuffer[GlutenPlanFallbackEvent]
- val listener = new SparkListener {
- override def onOtherEvent(event: SparkListenerEvent): Unit = {
- event match {
- case e: GlutenPlanFallbackEvent => events.append(e)
- case _ =>
- }
- }
- }
- spark.sparkContext.addSparkListener(listener)
- try {
- // Execute a query with gluten disabled — this mimics what
runQueryAndCompare does for the
- // vanilla baseline run. No GlutenPlanFallbackEvent should be emitted at
all.
- withSQLConf(GlutenConfig.GLUTEN_ENABLED.key -> "false") {
- spark.sql("SELECT c1, count(*) FROM tmp1 GROUP BY c1").collect()
+ withSQLConf(GlutenConfig.GLUTEN_ENABLED.key -> "false") {
+ GlutenSuiteUtils.withFallbackEventListener(spark.sparkContext) {
+ events =>
+ // Execute a query with gluten disabled — this mimics what
runQueryAndCompare does for
+ // the vanilla baseline run. No GlutenPlanFallbackEvent should be
emitted at all.
+ spark.sql("SELECT c1, count(*) FROM tmp1 GROUP BY c1").collect()
+ GlutenSuiteUtils.waitUntilEmpty(spark.sparkContext)
+ assert(
+ events.isEmpty,
+ s"Expected no GlutenPlanFallbackEvent for vanilla Spark execution,
" +
+ s"but got ${events.size} event(s). " +
+ s"First event fallback reasons:
${events.headOption.map(_.fallbackNodeToReason)}"
+ )
}
- GlutenSuiteUtils.waitUntilEmpty(spark.sparkContext)
- assert(
- events.isEmpty,
- s"Expected no GlutenPlanFallbackEvent for vanilla Spark execution, " +
- s"but got ${events.size} event(s). " +
- s"First event fallback reasons:
${events.headOption.map(_.fallbackNodeToReason)}"
- )
- } finally {
- spark.sparkContext.removeSparkListener(listener)
}
}
}
diff --git
a/backends-velox/src/test/scala/org/apache/spark/utils/GlutenSuiteUtils.scala
b/backends-velox/src/test/scala/org/apache/spark/utils/GlutenSuiteUtils.scala
index 1453a56072..602a5f4a30 100644
---
a/backends-velox/src/test/scala/org/apache/spark/utils/GlutenSuiteUtils.scala
+++
b/backends-velox/src/test/scala/org/apache/spark/utils/GlutenSuiteUtils.scala
@@ -16,8 +16,33 @@
*/
package org.apache.spark.utils
+import org.apache.gluten.events.GlutenPlanFallbackEvent
+
import org.apache.spark.SparkContext
+import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent}
+
+import scala.collection.mutable.ArrayBuffer
object GlutenSuiteUtils {
def waitUntilEmpty(ctx: SparkContext): Unit =
ctx.listenerBus.waitUntilEmpty()
+
+ // Drain any pending events from previous tests before registering the
listener.
+ // Spark's LiveListenerBus is async, so events posted but not yet dispatched
would
+ // still be delivered to a listener added afterwards, contaminating the
buffer.
+ // After the body finishes, callers should invoke waitUntilEmpty before
reading
+ // the events buffer.
+ def withFallbackEventListener(ctx: SparkContext)(
+ body: ArrayBuffer[GlutenPlanFallbackEvent] => Unit): Unit = {
+ waitUntilEmpty(ctx)
+ val events = new ArrayBuffer[GlutenPlanFallbackEvent]
+ val listener = new SparkListener {
+ override def onOtherEvent(event: SparkListenerEvent): Unit = event match
{
+ case e: GlutenPlanFallbackEvent => events.append(e)
+ case _ =>
+ }
+ }
+ ctx.addSparkListener(listener)
+ try body(events)
+ finally ctx.removeSparkListener(listener)
+ }
}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/FallbackTag.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/FallbackTag.scala
index 32020e68bc..f4702200e7 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/FallbackTag.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/FallbackTag.scala
@@ -22,6 +22,8 @@ import org.apache.spark.sql.execution.SparkPlan
import org.apache.commons.lang3.exception.ExceptionUtils
+import scala.collection.immutable.HashSet
+
sealed trait FallbackTag {
val stacktrace: Option[String] =
if (FallbackTags.DEBUG) {
@@ -33,8 +35,16 @@ sealed trait FallbackTag {
object FallbackTag {
- /** A tag that stores one reason text of fall back. */
- case class Appendable(override val reason: String) extends FallbackTag
+ /** A tag that stores distinct reason texts of fall back. */
+ case class Appendable private (private val reasonSet: HashSet[String])
extends FallbackTag {
+ override def reason(): String = reasonSet.mkString("; ")
+
+ def append(other: Appendable): Appendable = Appendable(reasonSet ++
other.reasonSet)
+ }
+
+ object Appendable {
+ def apply(reason: String): Appendable = new Appendable(HashSet(reason))
+ }
/**
* A tag that stores reason text of fall back. Other reasons will be
discarded when this tag is
@@ -93,7 +103,7 @@ object FallbackTags {
case (exclusive: FallbackTag.Exclusive, _) =>
exclusive
case (l: FallbackTag.Appendable, r: FallbackTag.Appendable) =>
- FallbackTag.Appendable(s"${l.reason}; ${r.reason}")
+ l.append(r)
}
mergedTagOption
.foreach(mergedTag => plan.setTagValue(TAG, mergedTag))
diff --git
a/gluten-core/src/test/scala/org/apache/gluten/extension/columnar/FallbackTagSuite.scala
b/gluten-core/src/test/scala/org/apache/gluten/extension/columnar/FallbackTagSuite.scala
new file mode 100644
index 0000000000..61408d91fd
--- /dev/null
+++
b/gluten-core/src/test/scala/org/apache/gluten/extension/columnar/FallbackTagSuite.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.extension.columnar
+
+import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
+
+import org.scalatest.funsuite.AnyFunSuite
+
+class FallbackTagSuite extends AnyFunSuite {
+
+ test("repeated tagging with the same reason does not grow the reason
string") {
+ // Simulates AQE re-running fallback rules across stages on the same
logicalLink,
+ // or several physical nodes sharing one logicalLink with the same
fallback reason
+ // (see GlutenFallbackReporter.printFallbackReason). Without dedup, the
reason
+ // would grow unboundedly to "r; r; r; r; r".
+ val plan = LocalRelation()
+ val reason = "manually fallback by user"
+ (1 to 5).foreach(_ => FallbackTags.add(plan, reason))
+ assert(FallbackTags.get(plan).reason() == reason)
+ }
+
+ test("distinct Appendable reasons are still concatenated") {
+ val plan = LocalRelation()
+ FallbackTags.add(plan, "reason A")
+ FallbackTags.add(plan, "reason B")
+ assert(FallbackTags.get(plan).reason().split("; ").toSet == Set("reason
A", "reason B"))
+ }
+
+ test("Appendable reasons with substring overlap are still distinct") {
+ val plan = LocalRelation()
+ FallbackTags.add(plan, "reason A")
+ FallbackTags.add(plan, "reason")
+ assert(FallbackTags.get(plan).reason().split("; ").toSet == Set("reason
A", "reason"))
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]