This is an automated email from the ASF dual-hosted git repository.
hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 580db626f1 [CORE] Test trait `WithQueryPlanListener` for ensuring no
fallback query plan node conveniently (#9802)
580db626f1 is described below
commit 580db626f1db649c5b4b6a90280f1eb9dd2d4f20
Author: Hongze Zhang <[email protected]>
AuthorDate: Thu May 29 18:35:10 2025 +0100
[CORE] Test trait `WithQueryPlanListener` for ensuring no fallback query
plan node conveniently (#9802)
---
.../org/apache/gluten/test/FallbackUtil.scala | 10 +-
.../apache/spark/sql/WithQueryPlanListener.scala | 136 +++++++++++++++++++++
.../org/apache/spark/sql/GlutenSQLQuerySuite.scala | 11 +-
3 files changed, 153 insertions(+), 4 deletions(-)
diff --git
a/gluten-substrait/src/test/scala/org/apache/gluten/test/FallbackUtil.scala
b/gluten-substrait/src/test/scala/org/apache/gluten/test/FallbackUtil.scala
index 2a0ecaf92c..2af0f06c34 100644
--- a/gluten-substrait/src/test/scala/org/apache/gluten/test/FallbackUtil.scala
+++ b/gluten-substrait/src/test/scala/org/apache/gluten/test/FallbackUtil.scala
@@ -30,7 +30,7 @@ import
org.apache.spark.sql.execution.exchange.ReusedExchangeExec
object FallbackUtil extends Logging with AdaptiveSparkPlanHelper {
- def skip(plan: SparkPlan): Boolean = {
+ def tolerate(plan: SparkPlan): Boolean = {
plan match {
case _: ColumnarToRowTransition =>
true
@@ -67,9 +67,13 @@ object FallbackUtil extends Logging with
AdaptiveSparkPlanHelper {
}
def hasFallback(plan: SparkPlan): Boolean = {
- val fallbackOperator = collectWithSubqueries(plan) { case plan => plan
}.filterNot(
- plan => plan.isInstanceOf[GlutenPlan] || skip(plan))
+ val fallbackOperator =
+ collectWithSubqueries(plan) { case plan => plan }.filter(plan =>
nodeHasFallback(plan))
fallbackOperator.foreach(operator => log.info(s"gluten fallback
operator:{$operator}"))
fallbackOperator.nonEmpty
}
+
+ def nodeHasFallback(plan: SparkPlan): Boolean = {
+ !plan.isInstanceOf[GlutenPlan] && !tolerate(plan)
+ }
}
diff --git
a/gluten-substrait/src/test/scala/org/apache/spark/sql/WithQueryPlanListener.scala
b/gluten-substrait/src/test/scala/org/apache/spark/sql/WithQueryPlanListener.scala
new file mode 100644
index 0000000000..a23360fd9d
--- /dev/null
+++
b/gluten-substrait/src/test/scala/org/apache/spark/sql/WithQueryPlanListener.scala
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql
+
+import org.apache.gluten.test.FallbackUtil.collectWithSubqueries
+
+import org.apache.spark.sql.WithQueryPlanListener.ListenerImpl
+import org.apache.spark.sql.execution.{QueryExecution, SparkPlan,
SparkPlanTest}
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.util.QueryExecutionListener
+
+import org.scalactic.source.Position
+import org.scalatest.Tag
+import org.scalatest.funsuite.AnyFunSuiteLike
+
+import scala.collection.mutable
+import scala.reflect.ClassTag
+// format: off
+/**
+ * A trait that registers listeners to collect all the executed query plans
during running a single
+ * test case.
+ *
+ * The collected query plans can be used to verify the correctness of the
query execution plan.
+ */
+// format: on
+trait WithQueryPlanListener extends SharedSparkSession with AnyFunSuiteLike {
+
+ assert(
+ !this.isInstanceOf[SparkPlanTest],
+ "WithQueryPlanListener should not be mixed in with SparkPlanTest as it
doesn't intercept " +
+ "the `SparkPlanTest.checkAnswer` calls"
+ )
+
+ protected val planListeners: WithQueryPlanListener.QueryPlanListeners =
+ new WithQueryPlanListener.QueryPlanListeners()
+
+ override def test(testName: String, testTags: Tag*)(testFun: => Any)(implicit
+ pos: Position): Unit = {
+ super.test(testName, testTags: _*)({
+ testFun
+ listeners().foreach(_.invokeAll())
+ })(pos)
+ }
+
+ override def beforeEach(): Unit = {
+ super.beforeEach()
+ spark.sessionState.listenerManager.register(new
ListenerImpl(planListeners.collect()))
+ }
+
+ override def afterEach(): Unit = {
+ listeners().foreach(listener =>
spark.sessionState.listenerManager.unregister(listener))
+ super.afterEach()
+ }
+
+ private def listeners() = {
+ spark.sessionState.listenerManager
+ .listListeners()
+ .filter(_.isInstanceOf[ListenerImpl])
+ .map(_.asInstanceOf[ListenerImpl])
+ .toSeq
+ }
+}
+
+object WithQueryPlanListener {
+ // A listener that collects all the executed query plans during running a
single test case.
+ private class ListenerImpl(val listeners: Seq[QueryPlanListener])
+ extends QueryExecutionListener
+ with AdaptiveSparkPlanHelper {
+ private val plans = mutable.ArrayBuffer[SparkPlan]()
+
+ override def onSuccess(funcName: String, qe: QueryExecution, durationNs:
Long): Unit = {
+ val plan = qe.executedPlan
+ plans += plan
+ }
+
+ override def onFailure(funcName: String, qe: QueryExecution, exception:
Exception): Unit = {}
+
+ def invokeAll(): Unit = {
+ while (plans.nonEmpty) {
+ val popped = plans.toSeq
+ plans.clear()
+ popped.foreach(plan => listeners.foreach(listener =>
listener.apply(plan)))
+ }
+ }
+ }
+
+ class QueryPlanListeners private[WithQueryPlanListener] {
+ private val builder = mutable.ArrayBuffer[QueryPlanListener]()
+
+ private[WithQueryPlanListener] def collect(): Seq[QueryPlanListener] = {
+ builder.toSeq
+ }
+
+ def onPlanExecuted(listener: QueryPlanListener): QueryPlanListeners = {
+ builder += listener
+ this
+ }
+
+ def assertNoNodeExists(prediction: SparkPlan => Boolean):
QueryPlanListeners = {
+ builder += {
+ planRoot =>
+ val allNodes = collectWithSubqueries(planRoot) { case p => p }
+ allNodes.foreach {
+ node =>
+ assert(
+ !prediction(node),
+ s"Found a node ${node.nodeName} in the query plan that fails
the assertion: " +
+ s"${planRoot.toString}")
+ }
+ }
+ this
+ }
+
+ def assertNoInstanceOf[T: ClassTag]: QueryPlanListeners = {
+ val clazz = implicitly[ClassTag[T]].runtimeClass
+ assertNoNodeExists(n => clazz.isAssignableFrom(n.getClass))
+ }
+ }
+
+ type QueryPlanListener = SparkPlan => Unit
+}
diff --git
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/GlutenSQLQuerySuite.scala
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/GlutenSQLQuerySuite.scala
index bd9699e400..48c57e0c2e 100644
---
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/GlutenSQLQuerySuite.scala
+++
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/GlutenSQLQuerySuite.scala
@@ -17,12 +17,21 @@
package org.apache.spark.sql
import org.apache.spark.SparkException
+import org.apache.spark.sql.execution.SortExec
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
import org.apache.spark.sql.internal.SQLConf
-class GlutenSQLQuerySuite extends SQLQuerySuite with GlutenSQLTestsTrait {
+class GlutenSQLQuerySuite
+ extends SQLQuerySuite
+ with GlutenSQLTestsTrait
+ with WithQueryPlanListener {
import testImplicits._
+ // Assert no fallbacks on the trivial supported operators.
+ // We only check for sort fallback at the moment since there
+ // are fallbacks that exist for other operators.
+ planListeners.assertNoInstanceOf[SortExec]
+
testGluten("SPARK-28156: self-join should not miss cached view") {
withTable("table1") {
withView("table1_vw") {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]