This is an automated email from the ASF dual-hosted git repository.

marong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 04697ec5ec [GLUTEN-11368][VL] Enable failed AQE UT (#11411)
04697ec5ec is described below

commit 04697ec5ecb6cadf878f4e1ca9c7896336a2d05b
Author: Rong Ma <[email protected]>
AuthorDate: Fri Jan 16 13:56:42 2026 +0000

    [GLUTEN-11368][VL] Enable failed AQE UT (#11411)
---
 .../velox/VeloxAdaptiveQueryExecSuite.scala        |  9 ++---
 .../velox/VeloxAdaptiveQueryExecSuite.scala        |  9 ++---
 .../velox/VeloxAdaptiveQueryExecSuite.scala        |  9 ++---
 .../velox/VeloxAdaptiveQueryExecSuite.scala        |  9 ++---
 .../velox/VeloxAdaptiveQueryExecSuite.scala        | 30 +++++++++--------
 .../velox/VeloxAdaptiveQueryExecSuite.scala        | 38 +++++++++++++---------
 6 files changed, 58 insertions(+), 46 deletions(-)

diff --git 
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala
 
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala
index 69da588c35..618dc07431 100644
--- 
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala
+++ 
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala
@@ -755,7 +755,9 @@ class VeloxAdaptiveQueryExecSuite extends 
AdaptiveQueryExecSuite with GlutenSQLT
       assert(read.metrics("numPartitions").value == read.partitionSpecs.length)
       assert(read.metrics("partitionDataSize").value > 0)
 
-      withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "300") {
+      // Gluten has smaller shuffle data size, and the right side is 
materialized before the left
+      // side. Need to lower the threshold to avoid the planner broadcasting 
the right side first.
+      withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "40") {
         val (_, adaptivePlan) = runAdaptiveAndVerifyResult(
           "SELECT * FROM testData join testData2 ON key = a where value = '1'")
         val join = collect(adaptivePlan) { case j: 
BroadcastHashJoinExecTransformerBase => j }.head
@@ -847,8 +849,7 @@ class VeloxAdaptiveQueryExecSuite extends 
AdaptiveQueryExecSuite with GlutenSQLT
     }
   }
 
-  // FIXME
-  ignoreGluten("SPARK-33551: Do not use AQE shuffle read for repartition") {
+  testGluten("SPARK-33551: Do not use AQE shuffle read for repartition") {
     def hasRepartitionShuffle(plan: SparkPlan): Boolean = {
       find(plan) {
         case s: ShuffleExchangeLike =>
@@ -897,7 +898,7 @@ class VeloxAdaptiveQueryExecSuite extends 
AdaptiveQueryExecSuite with GlutenSQLT
       df.collect()
       val plan = df.queryExecution.executedPlan
       assert(hasRepartitionShuffle(plan) == !optimizeOutRepartition)
-      val smj = findTopLevelSortMergeJoin(plan)
+      val smj = findTopLevelSortMergeJoinTransform(plan)
       assert(smj.length == 1)
       assert(smj.head.isSkewJoin == optimizeSkewJoin)
       val aqeReads = collect(smj.head) { case c: AQEShuffleReadExec => c }
diff --git 
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala
 
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala
index 33ed9d2e41..0c5eae6e58 100644
--- 
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala
+++ 
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala
@@ -755,7 +755,9 @@ class VeloxAdaptiveQueryExecSuite extends 
AdaptiveQueryExecSuite with GlutenSQLT
       assert(read.metrics("numPartitions").value == read.partitionSpecs.length)
       assert(read.metrics("partitionDataSize").value > 0)
 
-      withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "300") {
+      // Gluten has smaller shuffle data size, and the right side is 
materialized before the left
+      // side. Need to lower the threshold to avoid the planner broadcasting 
the right side first.
+      withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "40") {
         val (_, adaptivePlan) = runAdaptiveAndVerifyResult(
           "SELECT * FROM testData join testData2 ON key = a where value = '1'")
         val join = collect(adaptivePlan) { case j: 
BroadcastHashJoinExecTransformerBase => j }.head
@@ -849,8 +851,7 @@ class VeloxAdaptiveQueryExecSuite extends 
AdaptiveQueryExecSuite with GlutenSQLT
     }
   }
 
-  // FIXME
-  ignoreGluten("SPARK-33551: Do not use AQE shuffle read for repartition") {
+  testGluten("SPARK-33551: Do not use AQE shuffle read for repartition") {
     def hasRepartitionShuffle(plan: SparkPlan): Boolean = {
       find(plan) {
         case s: ShuffleExchangeLike =>
@@ -899,7 +900,7 @@ class VeloxAdaptiveQueryExecSuite extends 
AdaptiveQueryExecSuite with GlutenSQLT
       df.collect()
       val plan = df.queryExecution.executedPlan
       assert(hasRepartitionShuffle(plan) == !optimizeOutRepartition)
-      val smj = findTopLevelSortMergeJoin(plan)
+      val smj = findTopLevelSortMergeJoinTransform(plan)
       assert(smj.length == 1)
       assert(smj.head.isSkewJoin == optimizeSkewJoin)
       val aqeReads = collect(smj.head) { case c: AQEShuffleReadExec => c }
diff --git 
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala
 
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala
index e8263863b4..6acdfed419 100644
--- 
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala
+++ 
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala
@@ -753,7 +753,9 @@ class VeloxAdaptiveQueryExecSuite extends 
AdaptiveQueryExecSuite with GlutenSQLT
       assert(read.metrics("numPartitions").value == read.partitionSpecs.length)
       assert(read.metrics("partitionDataSize").value > 0)
 
-      withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "300") {
+      // Gluten has smaller shuffle data size, and the right side is 
materialized before the left
+      // side. Need to lower the threshold to avoid the planner broadcasting 
the right side first.
+      withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "40") {
         val (_, adaptivePlan) = runAdaptiveAndVerifyResult(
           "SELECT * FROM testData join testData2 ON key = a where value = '1'")
         val join = collect(adaptivePlan) { case j: 
BroadcastHashJoinExecTransformerBase => j }.head
@@ -847,8 +849,7 @@ class VeloxAdaptiveQueryExecSuite extends 
AdaptiveQueryExecSuite with GlutenSQLT
     }
   }
 
-  // FIXME
-  ignoreGluten("SPARK-33551: Do not use AQE shuffle read for repartition") {
+  testGluten("SPARK-33551: Do not use AQE shuffle read for repartition") {
     def hasRepartitionShuffle(plan: SparkPlan): Boolean = {
       find(plan) {
         case s: ShuffleExchangeLike =>
@@ -897,7 +898,7 @@ class VeloxAdaptiveQueryExecSuite extends 
AdaptiveQueryExecSuite with GlutenSQLT
       df.collect()
       val plan = df.queryExecution.executedPlan
       assert(hasRepartitionShuffle(plan) == !optimizeOutRepartition)
-      val smj = findTopLevelSortMergeJoin(plan)
+      val smj = findTopLevelSortMergeJoinTransform(plan)
       assert(smj.length == 1)
       assert(smj.head.isSkewJoin == optimizeSkewJoin)
       val aqeReads = collect(smj.head) { case c: AQEShuffleReadExec => c }
diff --git 
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala
 
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala
index e8263863b4..6acdfed419 100644
--- 
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala
+++ 
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala
@@ -753,7 +753,9 @@ class VeloxAdaptiveQueryExecSuite extends 
AdaptiveQueryExecSuite with GlutenSQLT
       assert(read.metrics("numPartitions").value == read.partitionSpecs.length)
       assert(read.metrics("partitionDataSize").value > 0)
 
-      withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "300") {
+      // Gluten has smaller shuffle data size, and the right side is 
materialized before the left
+      // side. Need to lower the threshold to avoid the planner broadcasting 
the right side first.
+      withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "40") {
         val (_, adaptivePlan) = runAdaptiveAndVerifyResult(
           "SELECT * FROM testData join testData2 ON key = a where value = '1'")
         val join = collect(adaptivePlan) { case j: 
BroadcastHashJoinExecTransformerBase => j }.head
@@ -847,8 +849,7 @@ class VeloxAdaptiveQueryExecSuite extends 
AdaptiveQueryExecSuite with GlutenSQLT
     }
   }
 
-  // FIXME
-  ignoreGluten("SPARK-33551: Do not use AQE shuffle read for repartition") {
+  testGluten("SPARK-33551: Do not use AQE shuffle read for repartition") {
     def hasRepartitionShuffle(plan: SparkPlan): Boolean = {
       find(plan) {
         case s: ShuffleExchangeLike =>
@@ -897,7 +898,7 @@ class VeloxAdaptiveQueryExecSuite extends 
AdaptiveQueryExecSuite with GlutenSQLT
       df.collect()
       val plan = df.queryExecution.executedPlan
       assert(hasRepartitionShuffle(plan) == !optimizeOutRepartition)
-      val smj = findTopLevelSortMergeJoin(plan)
+      val smj = findTopLevelSortMergeJoinTransform(plan)
       assert(smj.length == 1)
       assert(smj.head.isSkewJoin == optimizeSkewJoin)
       val aqeReads = collect(smj.head) { case c: AQEShuffleReadExec => c }
diff --git 
a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala
 
b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala
index a246e037ab..67e9baed04 100644
--- 
a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala
+++ 
b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.adaptive.velox
 import org.apache.gluten.config.GlutenConfig
 import org.apache.gluten.execution.{BroadcastHashJoinExecTransformerBase, 
ColumnarToCarrierRowExecBase, ShuffledHashJoinExecTransformerBase, 
SortExecTransformer, SortMergeJoinExecTransformer}
 
-import org.apache.spark.SparkConf
+import org.apache.spark.{SparkConf, SparkException}
 import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent}
 import org.apache.spark.sql.{Dataset, GlutenSQLTestsTrait, Row}
 import org.apache.spark.sql.GlutenTestConstants.GLUTEN_TEST
@@ -718,8 +718,7 @@ class VeloxAdaptiveQueryExecSuite extends 
AdaptiveQueryExecSuite with GlutenSQLT
     }
   }
 
-  // FIXME
-  ignoreGluten("SPARK-34682: AQEShuffleReadExec operating on canonicalized 
plan") {
+  testGluten("SPARK-34682: AQEShuffleReadExec operating on canonicalized 
plan") {
     withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {
       val (_, adaptivePlan) = runAdaptiveAndVerifyResult("SELECT key FROM 
testData GROUP BY key")
       val reads = collect(adaptivePlan) { case r: AQEShuffleReadExec => r }
@@ -727,16 +726,18 @@ class VeloxAdaptiveQueryExecSuite extends 
AdaptiveQueryExecSuite with GlutenSQLT
       val read = reads.head
       val c = read.canonicalized.asInstanceOf[AQEShuffleReadExec]
       // we can't just call execute() because that has separate checks for 
canonicalized plans
-      val ex = intercept[IllegalStateException] {
-        val doExecute = PrivateMethod[Unit](Symbol("doExecuteColumnar"))
-        c.invokePrivate(doExecute())
-      }
-      assert(ex.getMessage === "operating on canonicalized plan")
+      checkError(
+        exception = intercept[SparkException] {
+          val doExecute = PrivateMethod[Unit](Symbol("doExecute"))
+          c.invokePrivate(doExecute())
+        },
+        condition = "INTERNAL_ERROR",
+        parameters = Map("message" -> "operating on canonicalized plan")
+      )
     }
   }
 
-  // FIXME
-  ignoreGluten("metrics of the shuffle read") {
+  testGluten("metrics of the shuffle read") {
     withSQLConf(
       SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
       SQLConf.SHUFFLE_PARTITIONS.key -> "5") {
@@ -756,7 +757,9 @@ class VeloxAdaptiveQueryExecSuite extends 
AdaptiveQueryExecSuite with GlutenSQLT
       assert(read.metrics("numPartitions").value == read.partitionSpecs.length)
       assert(read.metrics("partitionDataSize").value > 0)
 
-      withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "300") {
+      // Gluten has smaller shuffle data size, and the right side is 
materialized before the left
+      // side. Need to lower the threshold to avoid the planner broadcasting 
the right side first.
+      withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "40") {
         val (_, adaptivePlan) = runAdaptiveAndVerifyResult(
           "SELECT * FROM testData join testData2 ON key = a where value = '1'")
         val join = collect(adaptivePlan) { case j: 
BroadcastHashJoinExecTransformerBase => j }.head
@@ -850,8 +853,7 @@ class VeloxAdaptiveQueryExecSuite extends 
AdaptiveQueryExecSuite with GlutenSQLT
     }
   }
 
-  // FIXME
-  ignoreGluten("SPARK-33551: Do not use AQE shuffle read for repartition") {
+  testGluten("SPARK-33551: Do not use AQE shuffle read for repartition") {
     def hasRepartitionShuffle(plan: SparkPlan): Boolean = {
       find(plan) {
         case s: ShuffleExchangeLike =>
@@ -900,7 +902,7 @@ class VeloxAdaptiveQueryExecSuite extends 
AdaptiveQueryExecSuite with GlutenSQLT
       df.collect()
       val plan = df.queryExecution.executedPlan
       assert(hasRepartitionShuffle(plan) == !optimizeOutRepartition)
-      val smj = findTopLevelSortMergeJoin(plan)
+      val smj = findTopLevelSortMergeJoinTransform(plan)
       assert(smj.length == 1)
       assert(smj.head.isSkewJoin == optimizeSkewJoin)
       val aqeReads = collect(smj.head) { case c: AQEShuffleReadExec => c }
diff --git 
a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala
 
b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala
index a246e037ab..6ebefbe15a 100644
--- 
a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala
+++ 
b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.adaptive.velox
 import org.apache.gluten.config.GlutenConfig
 import org.apache.gluten.execution.{BroadcastHashJoinExecTransformerBase, 
ColumnarToCarrierRowExecBase, ShuffledHashJoinExecTransformerBase, 
SortExecTransformer, SortMergeJoinExecTransformer}
 
-import org.apache.spark.SparkConf
+import org.apache.spark.{SparkConf, SparkException}
 import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent}
 import org.apache.spark.sql.{Dataset, GlutenSQLTestsTrait, Row}
 import org.apache.spark.sql.GlutenTestConstants.GLUTEN_TEST
@@ -718,8 +718,7 @@ class VeloxAdaptiveQueryExecSuite extends 
AdaptiveQueryExecSuite with GlutenSQLT
     }
   }
 
-  // FIXME
-  ignoreGluten("SPARK-34682: AQEShuffleReadExec operating on canonicalized 
plan") {
+  testGluten("SPARK-34682: AQEShuffleReadExec operating on canonicalized 
plan") {
     withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {
       val (_, adaptivePlan) = runAdaptiveAndVerifyResult("SELECT key FROM 
testData GROUP BY key")
       val reads = collect(adaptivePlan) { case r: AQEShuffleReadExec => r }
@@ -727,16 +726,18 @@ class VeloxAdaptiveQueryExecSuite extends 
AdaptiveQueryExecSuite with GlutenSQLT
       val read = reads.head
       val c = read.canonicalized.asInstanceOf[AQEShuffleReadExec]
       // we can't just call execute() because that has separate checks for 
canonicalized plans
-      val ex = intercept[IllegalStateException] {
-        val doExecute = PrivateMethod[Unit](Symbol("doExecuteColumnar"))
-        c.invokePrivate(doExecute())
-      }
-      assert(ex.getMessage === "operating on canonicalized plan")
+      checkError(
+        exception = intercept[SparkException] {
+          val doExecute = PrivateMethod[Unit](Symbol("doExecute"))
+          c.invokePrivate(doExecute())
+        },
+        condition = "INTERNAL_ERROR",
+        parameters = Map("message" -> "operating on canonicalized plan")
+      )
     }
   }
 
-  // FIXME
-  ignoreGluten("metrics of the shuffle read") {
+  testGluten("metrics of the shuffle read") {
     withSQLConf(
       SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
       SQLConf.SHUFFLE_PARTITIONS.key -> "5") {
@@ -750,13 +751,16 @@ class VeloxAdaptiveQueryExecSuite extends 
AdaptiveQueryExecSuite with GlutenSQLT
       assert(
         read.metrics.keys.toSeq.sorted == Seq(
           "numCoalescedPartitions",
+          "numEmptyPartitions",
           "numPartitions",
           "partitionDataSize"))
       assert(read.metrics("numCoalescedPartitions").value == 1)
       assert(read.metrics("numPartitions").value == read.partitionSpecs.length)
       assert(read.metrics("partitionDataSize").value > 0)
 
-      withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "300") {
+      // Gluten has smaller shuffle data size, and the right side is 
materialized before the left
+      // side. Need to lower the threshold to avoid the planner broadcasting 
the right side first.
+      withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "40") {
         val (_, adaptivePlan) = runAdaptiveAndVerifyResult(
           "SELECT * FROM testData join testData2 ON key = a where value = '1'")
         val join = collect(adaptivePlan) { case j: 
BroadcastHashJoinExecTransformerBase => j }.head
@@ -766,7 +770,7 @@ class VeloxAdaptiveQueryExecSuite extends 
AdaptiveQueryExecSuite with GlutenSQLT
         assert(reads.length == 1)
         val read = reads.head
         assert(read.isLocalRead)
-        assert(read.metrics.keys.toSeq == Seq("numPartitions"))
+        assert(read.metrics.keys.toSeq == Seq("numPartitions", 
"numEmptyPartitions"))
         assert(read.metrics("numPartitions").value == 
read.partitionSpecs.length)
       }
 
@@ -850,8 +854,7 @@ class VeloxAdaptiveQueryExecSuite extends 
AdaptiveQueryExecSuite with GlutenSQLT
     }
   }
 
-  // FIXME
-  ignoreGluten("SPARK-33551: Do not use AQE shuffle read for repartition") {
+  testGluten("SPARK-33551: Do not use AQE shuffle read for repartition") {
     def hasRepartitionShuffle(plan: SparkPlan): Boolean = {
       find(plan) {
         case s: ShuffleExchangeLike =>
@@ -900,7 +903,7 @@ class VeloxAdaptiveQueryExecSuite extends 
AdaptiveQueryExecSuite with GlutenSQLT
       df.collect()
       val plan = df.queryExecution.executedPlan
       assert(hasRepartitionShuffle(plan) == !optimizeOutRepartition)
-      val smj = findTopLevelSortMergeJoin(plan)
+      val smj = findTopLevelSortMergeJoinTransform(plan)
       assert(smj.length == 1)
       assert(smj.head.isSkewJoin == optimizeSkewJoin)
       val aqeReads = collect(smj.head) { case c: AQEShuffleReadExec => c }
@@ -914,7 +917,10 @@ class VeloxAdaptiveQueryExecSuite extends 
AdaptiveQueryExecSuite with GlutenSQLT
 
     withSQLConf(
       SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
-      SQLConf.SHUFFLE_PARTITIONS.key -> "5") {
+      SQLConf.SHUFFLE_PARTITIONS.key -> "5",
+      // SPARK-52777: Disabling cleanup as the test assertions depend on them
+      SQLConf.CLASSIC_SHUFFLE_DEPENDENCY_FILE_CLEANUP_ENABLED.key -> "false"
+    ) {
       val df = sql("""
                      |SELECT * FROM (
                      |  SELECT * FROM testData WHERE key = 1


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to