This is an automated email from the ASF dual-hosted git repository.
marong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 04697ec5ec [GLUTEN-11368][VL] Enable failed AQE UT (#11411)
04697ec5ec is described below
commit 04697ec5ecb6cadf878f4e1ca9c7896336a2d05b
Author: Rong Ma <[email protected]>
AuthorDate: Fri Jan 16 13:56:42 2026 +0000
[GLUTEN-11368][VL] Enable failed AQE UT (#11411)
---
.../velox/VeloxAdaptiveQueryExecSuite.scala | 9 ++---
.../velox/VeloxAdaptiveQueryExecSuite.scala | 9 ++---
.../velox/VeloxAdaptiveQueryExecSuite.scala | 9 ++---
.../velox/VeloxAdaptiveQueryExecSuite.scala | 9 ++---
.../velox/VeloxAdaptiveQueryExecSuite.scala | 30 +++++++++--------
.../velox/VeloxAdaptiveQueryExecSuite.scala | 38 +++++++++++++---------
6 files changed, 58 insertions(+), 46 deletions(-)
diff --git
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala
index 69da588c35..618dc07431 100644
---
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala
+++
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala
@@ -755,7 +755,9 @@ class VeloxAdaptiveQueryExecSuite extends
AdaptiveQueryExecSuite with GlutenSQLT
assert(read.metrics("numPartitions").value == read.partitionSpecs.length)
assert(read.metrics("partitionDataSize").value > 0)
- withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "300") {
+ // Gluten has smaller shuffle data size, and the right side is
materialized before the left
+ // side. Need to lower the threshold to avoid the planner broadcasting
the right side first.
+ withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "40") {
val (_, adaptivePlan) = runAdaptiveAndVerifyResult(
"SELECT * FROM testData join testData2 ON key = a where value = '1'")
val join = collect(adaptivePlan) { case j:
BroadcastHashJoinExecTransformerBase => j }.head
@@ -847,8 +849,7 @@ class VeloxAdaptiveQueryExecSuite extends
AdaptiveQueryExecSuite with GlutenSQLT
}
}
- // FIXME
- ignoreGluten("SPARK-33551: Do not use AQE shuffle read for repartition") {
+ testGluten("SPARK-33551: Do not use AQE shuffle read for repartition") {
def hasRepartitionShuffle(plan: SparkPlan): Boolean = {
find(plan) {
case s: ShuffleExchangeLike =>
@@ -897,7 +898,7 @@ class VeloxAdaptiveQueryExecSuite extends
AdaptiveQueryExecSuite with GlutenSQLT
df.collect()
val plan = df.queryExecution.executedPlan
assert(hasRepartitionShuffle(plan) == !optimizeOutRepartition)
- val smj = findTopLevelSortMergeJoin(plan)
+ val smj = findTopLevelSortMergeJoinTransform(plan)
assert(smj.length == 1)
assert(smj.head.isSkewJoin == optimizeSkewJoin)
val aqeReads = collect(smj.head) { case c: AQEShuffleReadExec => c }
diff --git
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala
index 33ed9d2e41..0c5eae6e58 100644
---
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala
+++
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala
@@ -755,7 +755,9 @@ class VeloxAdaptiveQueryExecSuite extends
AdaptiveQueryExecSuite with GlutenSQLT
assert(read.metrics("numPartitions").value == read.partitionSpecs.length)
assert(read.metrics("partitionDataSize").value > 0)
- withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "300") {
+ // Gluten has smaller shuffle data size, and the right side is
materialized before the left
+ // side. Need to lower the threshold to avoid the planner broadcasting
the right side first.
+ withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "40") {
val (_, adaptivePlan) = runAdaptiveAndVerifyResult(
"SELECT * FROM testData join testData2 ON key = a where value = '1'")
val join = collect(adaptivePlan) { case j:
BroadcastHashJoinExecTransformerBase => j }.head
@@ -849,8 +851,7 @@ class VeloxAdaptiveQueryExecSuite extends
AdaptiveQueryExecSuite with GlutenSQLT
}
}
- // FIXME
- ignoreGluten("SPARK-33551: Do not use AQE shuffle read for repartition") {
+ testGluten("SPARK-33551: Do not use AQE shuffle read for repartition") {
def hasRepartitionShuffle(plan: SparkPlan): Boolean = {
find(plan) {
case s: ShuffleExchangeLike =>
@@ -899,7 +900,7 @@ class VeloxAdaptiveQueryExecSuite extends
AdaptiveQueryExecSuite with GlutenSQLT
df.collect()
val plan = df.queryExecution.executedPlan
assert(hasRepartitionShuffle(plan) == !optimizeOutRepartition)
- val smj = findTopLevelSortMergeJoin(plan)
+ val smj = findTopLevelSortMergeJoinTransform(plan)
assert(smj.length == 1)
assert(smj.head.isSkewJoin == optimizeSkewJoin)
val aqeReads = collect(smj.head) { case c: AQEShuffleReadExec => c }
diff --git
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala
index e8263863b4..6acdfed419 100644
---
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala
+++
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala
@@ -753,7 +753,9 @@ class VeloxAdaptiveQueryExecSuite extends
AdaptiveQueryExecSuite with GlutenSQLT
assert(read.metrics("numPartitions").value == read.partitionSpecs.length)
assert(read.metrics("partitionDataSize").value > 0)
- withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "300") {
+ // Gluten has smaller shuffle data size, and the right side is
materialized before the left
+ // side. Need to lower the threshold to avoid the planner broadcasting
the right side first.
+ withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "40") {
val (_, adaptivePlan) = runAdaptiveAndVerifyResult(
"SELECT * FROM testData join testData2 ON key = a where value = '1'")
val join = collect(adaptivePlan) { case j:
BroadcastHashJoinExecTransformerBase => j }.head
@@ -847,8 +849,7 @@ class VeloxAdaptiveQueryExecSuite extends
AdaptiveQueryExecSuite with GlutenSQLT
}
}
- // FIXME
- ignoreGluten("SPARK-33551: Do not use AQE shuffle read for repartition") {
+ testGluten("SPARK-33551: Do not use AQE shuffle read for repartition") {
def hasRepartitionShuffle(plan: SparkPlan): Boolean = {
find(plan) {
case s: ShuffleExchangeLike =>
@@ -897,7 +898,7 @@ class VeloxAdaptiveQueryExecSuite extends
AdaptiveQueryExecSuite with GlutenSQLT
df.collect()
val plan = df.queryExecution.executedPlan
assert(hasRepartitionShuffle(plan) == !optimizeOutRepartition)
- val smj = findTopLevelSortMergeJoin(plan)
+ val smj = findTopLevelSortMergeJoinTransform(plan)
assert(smj.length == 1)
assert(smj.head.isSkewJoin == optimizeSkewJoin)
val aqeReads = collect(smj.head) { case c: AQEShuffleReadExec => c }
diff --git
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala
index e8263863b4..6acdfed419 100644
---
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala
+++
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala
@@ -753,7 +753,9 @@ class VeloxAdaptiveQueryExecSuite extends
AdaptiveQueryExecSuite with GlutenSQLT
assert(read.metrics("numPartitions").value == read.partitionSpecs.length)
assert(read.metrics("partitionDataSize").value > 0)
- withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "300") {
+ // Gluten has smaller shuffle data size, and the right side is
materialized before the left
+ // side. Need to lower the threshold to avoid the planner broadcasting
the right side first.
+ withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "40") {
val (_, adaptivePlan) = runAdaptiveAndVerifyResult(
"SELECT * FROM testData join testData2 ON key = a where value = '1'")
val join = collect(adaptivePlan) { case j:
BroadcastHashJoinExecTransformerBase => j }.head
@@ -847,8 +849,7 @@ class VeloxAdaptiveQueryExecSuite extends
AdaptiveQueryExecSuite with GlutenSQLT
}
}
- // FIXME
- ignoreGluten("SPARK-33551: Do not use AQE shuffle read for repartition") {
+ testGluten("SPARK-33551: Do not use AQE shuffle read for repartition") {
def hasRepartitionShuffle(plan: SparkPlan): Boolean = {
find(plan) {
case s: ShuffleExchangeLike =>
@@ -897,7 +898,7 @@ class VeloxAdaptiveQueryExecSuite extends
AdaptiveQueryExecSuite with GlutenSQLT
df.collect()
val plan = df.queryExecution.executedPlan
assert(hasRepartitionShuffle(plan) == !optimizeOutRepartition)
- val smj = findTopLevelSortMergeJoin(plan)
+ val smj = findTopLevelSortMergeJoinTransform(plan)
assert(smj.length == 1)
assert(smj.head.isSkewJoin == optimizeSkewJoin)
val aqeReads = collect(smj.head) { case c: AQEShuffleReadExec => c }
diff --git
a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala
b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala
index a246e037ab..67e9baed04 100644
---
a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala
+++
b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.adaptive.velox
import org.apache.gluten.config.GlutenConfig
import org.apache.gluten.execution.{BroadcastHashJoinExecTransformerBase,
ColumnarToCarrierRowExecBase, ShuffledHashJoinExecTransformerBase,
SortExecTransformer, SortMergeJoinExecTransformer}
-import org.apache.spark.SparkConf
+import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent}
import org.apache.spark.sql.{Dataset, GlutenSQLTestsTrait, Row}
import org.apache.spark.sql.GlutenTestConstants.GLUTEN_TEST
@@ -718,8 +718,7 @@ class VeloxAdaptiveQueryExecSuite extends
AdaptiveQueryExecSuite with GlutenSQLT
}
}
- // FIXME
- ignoreGluten("SPARK-34682: AQEShuffleReadExec operating on canonicalized
plan") {
+ testGluten("SPARK-34682: AQEShuffleReadExec operating on canonicalized
plan") {
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {
val (_, adaptivePlan) = runAdaptiveAndVerifyResult("SELECT key FROM
testData GROUP BY key")
val reads = collect(adaptivePlan) { case r: AQEShuffleReadExec => r }
@@ -727,16 +726,18 @@ class VeloxAdaptiveQueryExecSuite extends
AdaptiveQueryExecSuite with GlutenSQLT
val read = reads.head
val c = read.canonicalized.asInstanceOf[AQEShuffleReadExec]
// we can't just call execute() because that has separate checks for
canonicalized plans
- val ex = intercept[IllegalStateException] {
- val doExecute = PrivateMethod[Unit](Symbol("doExecuteColumnar"))
- c.invokePrivate(doExecute())
- }
- assert(ex.getMessage === "operating on canonicalized plan")
+ checkError(
+ exception = intercept[SparkException] {
+ val doExecute = PrivateMethod[Unit](Symbol("doExecute"))
+ c.invokePrivate(doExecute())
+ },
+ condition = "INTERNAL_ERROR",
+ parameters = Map("message" -> "operating on canonicalized plan")
+ )
}
}
- // FIXME
- ignoreGluten("metrics of the shuffle read") {
+ testGluten("metrics of the shuffle read") {
withSQLConf(
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
SQLConf.SHUFFLE_PARTITIONS.key -> "5") {
@@ -756,7 +757,9 @@ class VeloxAdaptiveQueryExecSuite extends
AdaptiveQueryExecSuite with GlutenSQLT
assert(read.metrics("numPartitions").value == read.partitionSpecs.length)
assert(read.metrics("partitionDataSize").value > 0)
- withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "300") {
+ // Gluten has smaller shuffle data size, and the right side is
materialized before the left
+ // side. Need to lower the threshold to avoid the planner broadcasting
the right side first.
+ withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "40") {
val (_, adaptivePlan) = runAdaptiveAndVerifyResult(
"SELECT * FROM testData join testData2 ON key = a where value = '1'")
val join = collect(adaptivePlan) { case j:
BroadcastHashJoinExecTransformerBase => j }.head
@@ -850,8 +853,7 @@ class VeloxAdaptiveQueryExecSuite extends
AdaptiveQueryExecSuite with GlutenSQLT
}
}
- // FIXME
- ignoreGluten("SPARK-33551: Do not use AQE shuffle read for repartition") {
+ testGluten("SPARK-33551: Do not use AQE shuffle read for repartition") {
def hasRepartitionShuffle(plan: SparkPlan): Boolean = {
find(plan) {
case s: ShuffleExchangeLike =>
@@ -900,7 +902,7 @@ class VeloxAdaptiveQueryExecSuite extends
AdaptiveQueryExecSuite with GlutenSQLT
df.collect()
val plan = df.queryExecution.executedPlan
assert(hasRepartitionShuffle(plan) == !optimizeOutRepartition)
- val smj = findTopLevelSortMergeJoin(plan)
+ val smj = findTopLevelSortMergeJoinTransform(plan)
assert(smj.length == 1)
assert(smj.head.isSkewJoin == optimizeSkewJoin)
val aqeReads = collect(smj.head) { case c: AQEShuffleReadExec => c }
diff --git
a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala
b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala
index a246e037ab..6ebefbe15a 100644
---
a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala
+++
b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/adaptive/velox/VeloxAdaptiveQueryExecSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.adaptive.velox
import org.apache.gluten.config.GlutenConfig
import org.apache.gluten.execution.{BroadcastHashJoinExecTransformerBase,
ColumnarToCarrierRowExecBase, ShuffledHashJoinExecTransformerBase,
SortExecTransformer, SortMergeJoinExecTransformer}
-import org.apache.spark.SparkConf
+import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent}
import org.apache.spark.sql.{Dataset, GlutenSQLTestsTrait, Row}
import org.apache.spark.sql.GlutenTestConstants.GLUTEN_TEST
@@ -718,8 +718,7 @@ class VeloxAdaptiveQueryExecSuite extends
AdaptiveQueryExecSuite with GlutenSQLT
}
}
- // FIXME
- ignoreGluten("SPARK-34682: AQEShuffleReadExec operating on canonicalized
plan") {
+ testGluten("SPARK-34682: AQEShuffleReadExec operating on canonicalized
plan") {
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {
val (_, adaptivePlan) = runAdaptiveAndVerifyResult("SELECT key FROM
testData GROUP BY key")
val reads = collect(adaptivePlan) { case r: AQEShuffleReadExec => r }
@@ -727,16 +726,18 @@ class VeloxAdaptiveQueryExecSuite extends
AdaptiveQueryExecSuite with GlutenSQLT
val read = reads.head
val c = read.canonicalized.asInstanceOf[AQEShuffleReadExec]
// we can't just call execute() because that has separate checks for
canonicalized plans
- val ex = intercept[IllegalStateException] {
- val doExecute = PrivateMethod[Unit](Symbol("doExecuteColumnar"))
- c.invokePrivate(doExecute())
- }
- assert(ex.getMessage === "operating on canonicalized plan")
+ checkError(
+ exception = intercept[SparkException] {
+ val doExecute = PrivateMethod[Unit](Symbol("doExecute"))
+ c.invokePrivate(doExecute())
+ },
+ condition = "INTERNAL_ERROR",
+ parameters = Map("message" -> "operating on canonicalized plan")
+ )
}
}
- // FIXME
- ignoreGluten("metrics of the shuffle read") {
+ testGluten("metrics of the shuffle read") {
withSQLConf(
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
SQLConf.SHUFFLE_PARTITIONS.key -> "5") {
@@ -750,13 +751,16 @@ class VeloxAdaptiveQueryExecSuite extends
AdaptiveQueryExecSuite with GlutenSQLT
assert(
read.metrics.keys.toSeq.sorted == Seq(
"numCoalescedPartitions",
+ "numEmptyPartitions",
"numPartitions",
"partitionDataSize"))
assert(read.metrics("numCoalescedPartitions").value == 1)
assert(read.metrics("numPartitions").value == read.partitionSpecs.length)
assert(read.metrics("partitionDataSize").value > 0)
- withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "300") {
+ // Gluten has smaller shuffle data size, and the right side is
materialized before the left
+ // side. Need to lower the threshold to avoid the planner broadcasting
the right side first.
+ withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "40") {
val (_, adaptivePlan) = runAdaptiveAndVerifyResult(
"SELECT * FROM testData join testData2 ON key = a where value = '1'")
val join = collect(adaptivePlan) { case j:
BroadcastHashJoinExecTransformerBase => j }.head
@@ -766,7 +770,7 @@ class VeloxAdaptiveQueryExecSuite extends
AdaptiveQueryExecSuite with GlutenSQLT
assert(reads.length == 1)
val read = reads.head
assert(read.isLocalRead)
- assert(read.metrics.keys.toSeq == Seq("numPartitions"))
+ assert(read.metrics.keys.toSeq == Seq("numPartitions",
"numEmptyPartitions"))
assert(read.metrics("numPartitions").value ==
read.partitionSpecs.length)
}
@@ -850,8 +854,7 @@ class VeloxAdaptiveQueryExecSuite extends
AdaptiveQueryExecSuite with GlutenSQLT
}
}
- // FIXME
- ignoreGluten("SPARK-33551: Do not use AQE shuffle read for repartition") {
+ testGluten("SPARK-33551: Do not use AQE shuffle read for repartition") {
def hasRepartitionShuffle(plan: SparkPlan): Boolean = {
find(plan) {
case s: ShuffleExchangeLike =>
@@ -900,7 +903,7 @@ class VeloxAdaptiveQueryExecSuite extends
AdaptiveQueryExecSuite with GlutenSQLT
df.collect()
val plan = df.queryExecution.executedPlan
assert(hasRepartitionShuffle(plan) == !optimizeOutRepartition)
- val smj = findTopLevelSortMergeJoin(plan)
+ val smj = findTopLevelSortMergeJoinTransform(plan)
assert(smj.length == 1)
assert(smj.head.isSkewJoin == optimizeSkewJoin)
val aqeReads = collect(smj.head) { case c: AQEShuffleReadExec => c }
@@ -914,7 +917,10 @@ class VeloxAdaptiveQueryExecSuite extends
AdaptiveQueryExecSuite with GlutenSQLT
withSQLConf(
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
- SQLConf.SHUFFLE_PARTITIONS.key -> "5") {
+ SQLConf.SHUFFLE_PARTITIONS.key -> "5",
+ // SPARK-52777: Disabling cleanup as the test assertions depend on them
+ SQLConf.CLASSIC_SHUFFLE_DEPENDENCY_FILE_CLEANUP_ENABLED.key -> "false"
+ ) {
val df = sql("""
|SELECT * FROM (
| SELECT * FROM testData WHERE key = 1
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]