This is an automated email from the ASF dual-hosted git repository.

liujiayi771 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 0342cdb222 [CORE] Deduplicate fallback reason when merging Appendable 
tags (#12049)
0342cdb222 is described below

commit 0342cdb222967890ba102800a80e946e79a38985
Author: Joey <[email protected]>
AuthorDate: Mon May 11 21:43:17 2026 +0800

    [CORE] Deduplicate fallback reason when merging Appendable tags (#12049)
---
 .../apache/gluten/execution/FallbackSuite.scala    | 144 ++++++++-------------
 .../org/apache/spark/utils/GlutenSuiteUtils.scala  |  25 ++++
 .../gluten/extension/columnar/FallbackTag.scala    |  16 ++-
 .../extension/columnar/FallbackTagSuite.scala      |  49 +++++++
 4 files changed, 138 insertions(+), 96 deletions(-)

diff --git 
a/backends-velox/src/test/scala/org/apache/gluten/execution/FallbackSuite.scala 
b/backends-velox/src/test/scala/org/apache/gluten/execution/FallbackSuite.scala
index 421e0959f1..ddc9cc923e 100644
--- 
a/backends-velox/src/test/scala/org/apache/gluten/execution/FallbackSuite.scala
+++ 
b/backends-velox/src/test/scala/org/apache/gluten/execution/FallbackSuite.scala
@@ -17,18 +17,14 @@
 package org.apache.gluten.execution
 
 import org.apache.gluten.config.{GlutenConfig, VeloxConfig}
-import org.apache.gluten.events.GlutenPlanFallbackEvent
 
 import org.apache.spark.SparkConf
-import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent}
 import org.apache.spark.sql.execution.{ColumnarBroadcastExchangeExec, 
ColumnarShuffleExchangeExec, SortExec, SparkPlan}
 import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper, 
AQEShuffleReadExec}
 import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
 import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, 
BroadcastNestedLoopJoinExec, SortMergeJoinExec}
 import org.apache.spark.utils.GlutenSuiteUtils
 
-import scala.collection.mutable.ArrayBuffer
-
 class FallbackSuite extends VeloxWholeStageTransformerSuite with 
AdaptiveSparkPlanHelper {
   protected val rootPath: String = getClass.getResource("/").getPath
   override protected val resourcePath: String = "/tpch-data-parquet"
@@ -315,80 +311,53 @@ class FallbackSuite extends 
VeloxWholeStageTransformerSuite with AdaptiveSparkPl
   }
 
   test("get correct fallback reason on nodes without logicalLink") {
-    val events = new ArrayBuffer[GlutenPlanFallbackEvent]
-    val listener = new SparkListener {
-      override def onOtherEvent(event: SparkListenerEvent): Unit = {
-        event match {
-          case e: GlutenPlanFallbackEvent => events.append(e)
-          case _ =>
-        }
+    withSQLConf(GlutenConfig.COLUMNAR_SORT_ENABLED.key -> "false") {
+      GlutenSuiteUtils.withFallbackEventListener(spark.sparkContext) {
+        events =>
+          val df = spark.sql("""
+                               |SELECT
+                               |  c1,
+                               |  c2,
+                               |  ROW_NUMBER() OVER (PARTITION BY c1 ORDER BY 
c2) as row_num,
+                               |  RANK() OVER (PARTITION BY c1 ORDER BY c2) as 
rank_num
+                               |FROM tmp1
+                               |
+                               |""".stripMargin)
+          df.collect()
+          GlutenSuiteUtils.waitUntilEmpty(spark.sparkContext)
+          val sort = find(df.queryExecution.executedPlan) {
+            _.isInstanceOf[SortExec]
+          }
+          assert(sort.isDefined)
+          val fallbackReasons = events.flatMap(_.fallbackNodeToReason.values)
+          assert(fallbackReasons.nonEmpty)
+          assert(
+            fallbackReasons.forall(
+              _.contains("[FallbackByUserOptions] Validation failed on node 
Sort")))
       }
     }
-    // Drain any pending events from previous tests before registering the 
listener.
-    // Spark's LiveListenerBus is async, so events posted but not yet 
dispatched will
-    // still be delivered to listeners added afterwards, contaminating 
`events` here.
-    GlutenSuiteUtils.waitUntilEmpty(spark.sparkContext)
-    spark.sparkContext.addSparkListener(listener)
-    withSQLConf(GlutenConfig.COLUMNAR_SORT_ENABLED.key -> "false") {
-      try {
+  }
+
+  test("fallback when nested loop join has unsupported expression") {
+    GlutenSuiteUtils.withFallbackEventListener(spark.sparkContext) {
+      events =>
         val df = spark.sql("""
-                             |SELECT
-                             |  c1,
-                             |  c2,
-                             |  ROW_NUMBER() OVER (PARTITION BY c1 ORDER BY 
c2) as row_num,
-                             |  RANK() OVER (PARTITION BY c1 ORDER BY c2) as 
rank_num
-                             |FROM tmp1
-                             |
+                             |select tmp1.c1, tmp1.c2 from tmp1
+                             |left join tmp2 on (
+                             |  tmp1.c1 = regexp_extract(tmp2.c1, 
'(?<=@)[^.]+(?=\.)', 0)
+                             |  or tmp2.c1 > 10
+                             |)
                              |""".stripMargin)
         df.collect()
         GlutenSuiteUtils.waitUntilEmpty(spark.sparkContext)
-        val sort = find(df.queryExecution.executedPlan) {
-          _.isInstanceOf[SortExec]
+
+        val nestedLoopJoin = find(df.queryExecution.executedPlan) {
+          _.isInstanceOf[BroadcastNestedLoopJoinExec]
         }
-        assert(sort.isDefined)
+        assert(nestedLoopJoin.isDefined)
         val fallbackReasons = events.flatMap(_.fallbackNodeToReason.values)
         assert(fallbackReasons.nonEmpty)
-        assert(
-          fallbackReasons.forall(
-            _.contains("[FallbackByUserOptions] Validation failed on node 
Sort")))
-      } finally {
-        spark.sparkContext.removeSparkListener(listener)
-      }
-    }
-  }
-
-  test("fallback when nested loop join has unsupported expression") {
-    val events = new ArrayBuffer[GlutenPlanFallbackEvent]
-    val listener = new SparkListener {
-      override def onOtherEvent(event: SparkListenerEvent): Unit = {
-        event match {
-          case e: GlutenPlanFallbackEvent => events.append(e)
-          case _ =>
-        }
-      }
-    }
-    spark.sparkContext.addSparkListener(listener)
-
-    try {
-      val df = spark.sql("""
-                           |select tmp1.c1, tmp1.c2 from tmp1
-                           |left join tmp2 on (
-                           |  tmp1.c1 = regexp_extract(tmp2.c1, 
'(?<=@)[^.]+(?=\.)', 0)
-                           |  or tmp2.c1 > 10
-                           |)
-                           |""".stripMargin)
-      df.collect()
-      GlutenSuiteUtils.waitUntilEmpty(spark.sparkContext)
-
-      val nestedLoopJoin = find(df.queryExecution.executedPlan) {
-        _.isInstanceOf[BroadcastNestedLoopJoinExec]
-      }
-      assert(nestedLoopJoin.isDefined)
-      val fallbackReasons = events.flatMap(_.fallbackNodeToReason.values)
-      assert(fallbackReasons.nonEmpty)
-      assert(fallbackReasons.forall(_.contains("regexp_extract due to 
Pattern")))
-    } finally {
-      spark.sparkContext.removeSparkListener(listener)
+        assert(fallbackReasons.forall(_.contains("regexp_extract due to 
Pattern")))
     }
   }
 
@@ -397,31 +366,20 @@ class FallbackSuite extends 
VeloxWholeStageTransformerSuite with AdaptiveSparkPl
     // GlutenPlanFallbackEvent even when spark.gluten.enabled=false (e.g. the 
vanilla baseline run
     // inside runQueryAndCompare). All nodes would appear as fallback with the 
generic reason
     // "Gluten does not touch it or does not support it".
-    val events = new ArrayBuffer[GlutenPlanFallbackEvent]
-    val listener = new SparkListener {
-      override def onOtherEvent(event: SparkListenerEvent): Unit = {
-        event match {
-          case e: GlutenPlanFallbackEvent => events.append(e)
-          case _ =>
-        }
-      }
-    }
-    spark.sparkContext.addSparkListener(listener)
-    try {
-      // Execute a query with gluten disabled — this mimics what 
runQueryAndCompare does for the
-      // vanilla baseline run. No GlutenPlanFallbackEvent should be emitted at 
all.
-      withSQLConf(GlutenConfig.GLUTEN_ENABLED.key -> "false") {
-        spark.sql("SELECT c1, count(*) FROM tmp1 GROUP BY c1").collect()
+    withSQLConf(GlutenConfig.GLUTEN_ENABLED.key -> "false") {
+      GlutenSuiteUtils.withFallbackEventListener(spark.sparkContext) {
+        events =>
+          // Execute a query with gluten disabled — this mimics what 
runQueryAndCompare does for
+          // the vanilla baseline run. No GlutenPlanFallbackEvent should be 
emitted at all.
+          spark.sql("SELECT c1, count(*) FROM tmp1 GROUP BY c1").collect()
+          GlutenSuiteUtils.waitUntilEmpty(spark.sparkContext)
+          assert(
+            events.isEmpty,
+            s"Expected no GlutenPlanFallbackEvent for vanilla Spark execution, 
" +
+              s"but got ${events.size} event(s). " +
+              s"First event fallback reasons: 
${events.headOption.map(_.fallbackNodeToReason)}"
+          )
       }
-      GlutenSuiteUtils.waitUntilEmpty(spark.sparkContext)
-      assert(
-        events.isEmpty,
-        s"Expected no GlutenPlanFallbackEvent for vanilla Spark execution, " +
-          s"but got ${events.size} event(s). " +
-          s"First event fallback reasons: 
${events.headOption.map(_.fallbackNodeToReason)}"
-      )
-    } finally {
-      spark.sparkContext.removeSparkListener(listener)
     }
   }
 }
diff --git 
a/backends-velox/src/test/scala/org/apache/spark/utils/GlutenSuiteUtils.scala 
b/backends-velox/src/test/scala/org/apache/spark/utils/GlutenSuiteUtils.scala
index 1453a56072..602a5f4a30 100644
--- 
a/backends-velox/src/test/scala/org/apache/spark/utils/GlutenSuiteUtils.scala
+++ 
b/backends-velox/src/test/scala/org/apache/spark/utils/GlutenSuiteUtils.scala
@@ -16,8 +16,33 @@
  */
 package org.apache.spark.utils
 
+import org.apache.gluten.events.GlutenPlanFallbackEvent
+
 import org.apache.spark.SparkContext
+import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent}
+
+import scala.collection.mutable.ArrayBuffer
 
 object GlutenSuiteUtils {
   def waitUntilEmpty(ctx: SparkContext): Unit = 
ctx.listenerBus.waitUntilEmpty()
+
+  // Drain any pending events from previous tests before registering the 
listener.
+  // Spark's LiveListenerBus is async, so events posted but not yet dispatched 
would
+  // still be delivered to a listener added afterwards, contaminating the 
buffer.
+  // After the body finishes, callers should invoke waitUntilEmpty before 
reading
+  // the events buffer.
+  def withFallbackEventListener(ctx: SparkContext)(
+      body: ArrayBuffer[GlutenPlanFallbackEvent] => Unit): Unit = {
+    waitUntilEmpty(ctx)
+    val events = new ArrayBuffer[GlutenPlanFallbackEvent]
+    val listener = new SparkListener {
+      override def onOtherEvent(event: SparkListenerEvent): Unit = event match 
{
+        case e: GlutenPlanFallbackEvent => events.append(e)
+        case _ =>
+      }
+    }
+    ctx.addSparkListener(listener)
+    try body(events)
+    finally ctx.removeSparkListener(listener)
+  }
 }
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/FallbackTag.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/FallbackTag.scala
index 32020e68bc..f4702200e7 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/FallbackTag.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/FallbackTag.scala
@@ -22,6 +22,8 @@ import org.apache.spark.sql.execution.SparkPlan
 
 import org.apache.commons.lang3.exception.ExceptionUtils
 
+import scala.collection.immutable.HashSet
+
 sealed trait FallbackTag {
   val stacktrace: Option[String] =
     if (FallbackTags.DEBUG) {
@@ -33,8 +35,16 @@ sealed trait FallbackTag {
 
 object FallbackTag {
 
-  /** A tag that stores one reason text of fall back. */
-  case class Appendable(override val reason: String) extends FallbackTag
+  /** A tag that stores distinct reason texts of fall back. */
+  case class Appendable private (private val reasonSet: HashSet[String]) 
extends FallbackTag {
+    override def reason(): String = reasonSet.mkString("; ")
+
+    def append(other: Appendable): Appendable = Appendable(reasonSet ++ 
other.reasonSet)
+  }
+
+  object Appendable {
+    def apply(reason: String): Appendable = new Appendable(HashSet(reason))
+  }
 
   /**
    * A tag that stores reason text of fall back. Other reasons will be 
discarded when this tag is
@@ -93,7 +103,7 @@ object FallbackTags {
         case (exclusive: FallbackTag.Exclusive, _) =>
           exclusive
         case (l: FallbackTag.Appendable, r: FallbackTag.Appendable) =>
-          FallbackTag.Appendable(s"${l.reason}; ${r.reason}")
+          l.append(r)
       }
     mergedTagOption
       .foreach(mergedTag => plan.setTagValue(TAG, mergedTag))
diff --git 
a/gluten-core/src/test/scala/org/apache/gluten/extension/columnar/FallbackTagSuite.scala
 
b/gluten-core/src/test/scala/org/apache/gluten/extension/columnar/FallbackTagSuite.scala
new file mode 100644
index 0000000000..61408d91fd
--- /dev/null
+++ 
b/gluten-core/src/test/scala/org/apache/gluten/extension/columnar/FallbackTagSuite.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.extension.columnar
+
+import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
+
+import org.scalatest.funsuite.AnyFunSuite
+
+class FallbackTagSuite extends AnyFunSuite {
+
+  test("repeated tagging with the same reason does not grow the reason 
string") {
+    // Simulates AQE re-running fallback rules across stages on the same 
logicalLink,
+    // or several physical nodes sharing one logicalLink with the same 
fallback reason
+    // (see GlutenFallbackReporter.printFallbackReason). Without dedup, the 
reason
+    // would grow unboundedly to "r; r; r; r; r".
+    val plan = LocalRelation()
+    val reason = "manually fallback by user"
+    (1 to 5).foreach(_ => FallbackTags.add(plan, reason))
+    assert(FallbackTags.get(plan).reason() == reason)
+  }
+
+  test("distinct Appendable reasons are still concatenated") {
+    val plan = LocalRelation()
+    FallbackTags.add(plan, "reason A")
+    FallbackTags.add(plan, "reason B")
+    assert(FallbackTags.get(plan).reason().split("; ").toSet == Set("reason 
A", "reason B"))
+  }
+
+  test("Appendable reasons with substring overlap are still distinct") {
+    val plan = LocalRelation()
+    FallbackTags.add(plan, "reason A")
+    FallbackTags.add(plan, "reason")
+    assert(FallbackTags.get(plan).reason().split("; ").toSet == Set("reason 
A", "reason"))
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to