This is an automated email from the ASF dual-hosted git repository.

hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 580db626f1 [CORE] Test trait `WithQueryPlanListener` for ensuring no 
fallback query plan node conveniently (#9802)
580db626f1 is described below

commit 580db626f1db649c5b4b6a90280f1eb9dd2d4f20
Author: Hongze Zhang <[email protected]>
AuthorDate: Thu May 29 18:35:10 2025 +0100

    [CORE] Test trait `WithQueryPlanListener` for ensuring no fallback query 
plan node conveniently (#9802)
---
 .../org/apache/gluten/test/FallbackUtil.scala      |  10 +-
 .../apache/spark/sql/WithQueryPlanListener.scala   | 136 +++++++++++++++++++++
 .../org/apache/spark/sql/GlutenSQLQuerySuite.scala |  11 +-
 3 files changed, 153 insertions(+), 4 deletions(-)

diff --git 
a/gluten-substrait/src/test/scala/org/apache/gluten/test/FallbackUtil.scala 
b/gluten-substrait/src/test/scala/org/apache/gluten/test/FallbackUtil.scala
index 2a0ecaf92c..2af0f06c34 100644
--- a/gluten-substrait/src/test/scala/org/apache/gluten/test/FallbackUtil.scala
+++ b/gluten-substrait/src/test/scala/org/apache/gluten/test/FallbackUtil.scala
@@ -30,7 +30,7 @@ import 
org.apache.spark.sql.execution.exchange.ReusedExchangeExec
 
 object FallbackUtil extends Logging with AdaptiveSparkPlanHelper {
 
-  def skip(plan: SparkPlan): Boolean = {
+  def tolerate(plan: SparkPlan): Boolean = {
     plan match {
       case _: ColumnarToRowTransition =>
         true
@@ -67,9 +67,13 @@ object FallbackUtil extends Logging with 
AdaptiveSparkPlanHelper {
   }
 
   def hasFallback(plan: SparkPlan): Boolean = {
-    val fallbackOperator = collectWithSubqueries(plan) { case plan => plan 
}.filterNot(
-      plan => plan.isInstanceOf[GlutenPlan] || skip(plan))
+    val fallbackOperator =
+      collectWithSubqueries(plan) { case plan => plan }.filter(plan => 
nodeHasFallback(plan))
     fallbackOperator.foreach(operator => log.info(s"gluten fallback 
operator:{$operator}"))
     fallbackOperator.nonEmpty
   }
+
+  def nodeHasFallback(plan: SparkPlan): Boolean = {
+    !plan.isInstanceOf[GlutenPlan] && !tolerate(plan)
+  }
 }
diff --git 
a/gluten-substrait/src/test/scala/org/apache/spark/sql/WithQueryPlanListener.scala
 
b/gluten-substrait/src/test/scala/org/apache/spark/sql/WithQueryPlanListener.scala
new file mode 100644
index 0000000000..a23360fd9d
--- /dev/null
+++ 
b/gluten-substrait/src/test/scala/org/apache/spark/sql/WithQueryPlanListener.scala
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql
+
+import org.apache.gluten.test.FallbackUtil.collectWithSubqueries
+
+import org.apache.spark.sql.WithQueryPlanListener.ListenerImpl
+import org.apache.spark.sql.execution.{QueryExecution, SparkPlan, 
SparkPlanTest}
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.util.QueryExecutionListener
+
+import org.scalactic.source.Position
+import org.scalatest.Tag
+import org.scalatest.funsuite.AnyFunSuiteLike
+
+import scala.collection.mutable
+import scala.reflect.ClassTag
+// format: off
+/**
+ * A trait that registers listeners to collect all the executed query plans 
during running a single
+ * test case.
+ *
+ * The collected query plans can be used to verify the correctness of the 
query execution plan.
+ */
+// format: on
+trait WithQueryPlanListener extends SharedSparkSession with AnyFunSuiteLike {
+
+  assert(
+    !this.isInstanceOf[SparkPlanTest],
+    "WithQueryPlanListener should not be mixed in with SparkPlanTest as it 
doesn't intercept " +
+      "the `SparkPlanTest.checkAnswer` calls"
+  )
+
+  protected val planListeners: WithQueryPlanListener.QueryPlanListeners =
+    new WithQueryPlanListener.QueryPlanListeners()
+
+  override def test(testName: String, testTags: Tag*)(testFun: => Any)(implicit
+      pos: Position): Unit = {
+    super.test(testName, testTags: _*)({
+      testFun
+      listeners().foreach(_.invokeAll())
+    })(pos)
+  }
+
+  override def beforeEach(): Unit = {
+    super.beforeEach()
+    spark.sessionState.listenerManager.register(new 
ListenerImpl(planListeners.collect()))
+  }
+
+  override def afterEach(): Unit = {
+    listeners().foreach(listener => 
spark.sessionState.listenerManager.unregister(listener))
+    super.afterEach()
+  }
+
+  private def listeners() = {
+    spark.sessionState.listenerManager
+      .listListeners()
+      .filter(_.isInstanceOf[ListenerImpl])
+      .map(_.asInstanceOf[ListenerImpl])
+      .toSeq
+  }
+}
+
+object WithQueryPlanListener {
+  // A listener that collects all the executed query plans during running a 
single test case.
+  private class ListenerImpl(val listeners: Seq[QueryPlanListener])
+    extends QueryExecutionListener
+    with AdaptiveSparkPlanHelper {
+    private val plans = mutable.ArrayBuffer[SparkPlan]()
+
+    override def onSuccess(funcName: String, qe: QueryExecution, durationNs: 
Long): Unit = {
+      val plan = qe.executedPlan
+      plans += plan
+    }
+
+    override def onFailure(funcName: String, qe: QueryExecution, exception: 
Exception): Unit = {}
+
+    def invokeAll(): Unit = {
+      while (plans.nonEmpty) {
+        val popped = plans.toSeq
+        plans.clear()
+        popped.foreach(plan => listeners.foreach(listener => 
listener.apply(plan)))
+      }
+    }
+  }
+
+  class QueryPlanListeners private[WithQueryPlanListener] {
+    private val builder = mutable.ArrayBuffer[QueryPlanListener]()
+
+    private[WithQueryPlanListener] def collect(): Seq[QueryPlanListener] = {
+      builder.toSeq
+    }
+
+    def onPlanExecuted(listener: QueryPlanListener): QueryPlanListeners = {
+      builder += listener
+      this
+    }
+
+    def assertNoNodeExists(prediction: SparkPlan => Boolean): 
QueryPlanListeners = {
+      builder += {
+        planRoot =>
+          val allNodes = collectWithSubqueries(planRoot) { case p => p }
+          allNodes.foreach {
+            node =>
+              assert(
+                !prediction(node),
+                s"Found a node ${node.nodeName} in the query plan that fails 
the assertion: " +
+                  s"${planRoot.toString}")
+          }
+      }
+      this
+    }
+
+    def assertNoInstanceOf[T: ClassTag]: QueryPlanListeners = {
+      val clazz = implicitly[ClassTag[T]].runtimeClass
+      assertNoNodeExists(n => clazz.isAssignableFrom(n.getClass))
+    }
+  }
+
+  type QueryPlanListener = SparkPlan => Unit
+}
diff --git 
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/GlutenSQLQuerySuite.scala
 
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/GlutenSQLQuerySuite.scala
index bd9699e400..48c57e0c2e 100644
--- 
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/GlutenSQLQuerySuite.scala
+++ 
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/GlutenSQLQuerySuite.scala
@@ -17,12 +17,21 @@
 package org.apache.spark.sql
 
 import org.apache.spark.SparkException
+import org.apache.spark.sql.execution.SortExec
 import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
 import org.apache.spark.sql.internal.SQLConf
 
-class GlutenSQLQuerySuite extends SQLQuerySuite with GlutenSQLTestsTrait {
+class GlutenSQLQuerySuite
+  extends SQLQuerySuite
+  with GlutenSQLTestsTrait
+  with WithQueryPlanListener {
   import testImplicits._
 
+  // Assert no fallbacks on the trivial supported operators.
+  // We only check for sort fallback at the moment since there
+  // are fallbacks that exist for other operators.
+  planListeners.assertNoInstanceOf[SortExec]
+
   testGluten("SPARK-28156: self-join should not miss cached view") {
     withTable("table1") {
       withView("table1_vw") {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to