This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new 9dd6fd5ef chore: Ignore Spark SQL WholeStageCodegenSuite tests (#1859)
9dd6fd5ef is described below
commit 9dd6fd5ef045df98b6ceeeafd7680bbf7e8ef892
Author: Andy Grove <[email protected]>
AuthorDate: Sat Jun 7 09:47:30 2025 -0600
chore: Ignore Spark SQL WholeStageCodegenSuite tests (#1859)
---
dev/diffs/3.4.3.diff | 154 +++-------------------------
dev/diffs/3.5.4.diff | 79 +++------------
dev/diffs/3.5.5.diff | 96 ++++--------------
dev/diffs/4.0.0-preview1.diff | 229 +++---------------------------------------
4 files changed, 62 insertions(+), 496 deletions(-)
diff --git a/dev/diffs/3.4.3.diff b/dev/diffs/3.4.3.diff
index bf44e3d7a..18e91d9da 100644
--- a/dev/diffs/3.4.3.diff
+++ b/dev/diffs/3.4.3.diff
@@ -1305,153 +1305,27 @@ index b14f4a405f6..ab7baf434a5 100644
spark.range(1).foreach { _ =>
columnarToRowExec.canonicalized
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
-index ac710c32296..baae214c6ee 100644
+index ac710c32296..2854b433dd3 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
-@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution
+@@ -17,7 +17,7 @@
+
+ package org.apache.spark.sql.execution
- import org.apache.spark.sql.{Dataset, QueryTest, Row, SaveMode}
+-import org.apache.spark.sql.{Dataset, QueryTest, Row, SaveMode}
++import org.apache.spark.sql.{Dataset, IgnoreCometSuite, QueryTest, Row,
SaveMode}
import org.apache.spark.sql.catalyst.expressions.codegen.{ByteCodeStats,
CodeAndComment, CodeGenerator}
-+import org.apache.spark.sql.comet.{CometHashJoinExec, CometSortMergeJoinExec}
import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecutionSuite
import org.apache.spark.sql.execution.aggregate.{HashAggregateExec,
SortAggregateExec}
- import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
-@@ -169,6 +170,7 @@ class WholeStageCodegenSuite extends QueryTest with
SharedSparkSession
- val oneJoinDF = df1.join(df2.hint("SHUFFLE_HASH"), $"k1" === $"k2")
- assert(oneJoinDF.queryExecution.executedPlan.collect {
- case WholeStageCodegenExec(_ : ShuffledHashJoinExec) => true
-+ case _: CometHashJoinExec => true
- }.size === 1)
- checkAnswer(oneJoinDF, Seq(Row(0, 0), Row(1, 1), Row(2, 2), Row(3, 3),
Row(4, 4)))
-
-@@ -177,6 +179,7 @@ class WholeStageCodegenSuite extends QueryTest with
SharedSparkSession
- .join(df3.hint("SHUFFLE_HASH"), $"k1" === $"k3")
- assert(twoJoinsDF.queryExecution.executedPlan.collect {
- case WholeStageCodegenExec(_ : ShuffledHashJoinExec) => true
-+ case _: CometHashJoinExec => true
- }.size === 2)
- checkAnswer(twoJoinsDF,
- Seq(Row(0, 0, 0), Row(1, 1, 1), Row(2, 2, 2), Row(3, 3, 3), Row(4, 4,
4)))
-@@ -193,6 +196,8 @@ class WholeStageCodegenSuite extends QueryTest with
SharedSparkSession
- assert(joinUniqueDF.queryExecution.executedPlan.collect {
- case WholeStageCodegenExec(_ : ShuffledHashJoinExec) if hint ==
"SHUFFLE_HASH" => true
- case WholeStageCodegenExec(_ : SortMergeJoinExec) if hint ==
"SHUFFLE_MERGE" => true
-+ case _: CometHashJoinExec if hint == "SHUFFLE_HASH" => true
-+ case _: CometSortMergeJoinExec if hint == "SHUFFLE_MERGE" => true
- }.size === 1)
- checkAnswer(joinUniqueDF, Seq(Row(0, 0), Row(1, 1), Row(2, 2), Row(3,
3), Row(4, 4),
- Row(null, 5), Row(null, 6), Row(null, 7), Row(null, 8), Row(null, 9)))
-@@ -203,6 +208,8 @@ class WholeStageCodegenSuite extends QueryTest with
SharedSparkSession
- assert(joinNonUniqueDF.queryExecution.executedPlan.collect {
- case WholeStageCodegenExec(_ : ShuffledHashJoinExec) if hint ==
"SHUFFLE_HASH" => true
- case WholeStageCodegenExec(_ : SortMergeJoinExec) if hint ==
"SHUFFLE_MERGE" => true
-+ case _: CometHashJoinExec if hint == "SHUFFLE_HASH" => true
-+ case _: CometSortMergeJoinExec if hint == "SHUFFLE_MERGE" => true
- }.size === 1)
- checkAnswer(joinNonUniqueDF, Seq(Row(0, 0), Row(0, 3), Row(0, 6),
Row(0, 9), Row(1, 1),
- Row(1, 4), Row(1, 7), Row(2, 2), Row(2, 5), Row(2, 8), Row(3, null),
Row(4, null)))
-@@ -213,6 +220,8 @@ class WholeStageCodegenSuite extends QueryTest with
SharedSparkSession
- assert(joinWithNonEquiDF.queryExecution.executedPlan.collect {
- case WholeStageCodegenExec(_ : ShuffledHashJoinExec) if hint ==
"SHUFFLE_HASH" => true
- case WholeStageCodegenExec(_ : SortMergeJoinExec) if hint ==
"SHUFFLE_MERGE" => true
-+ case _: CometHashJoinExec if hint == "SHUFFLE_HASH" => true
-+ case _: CometSortMergeJoinExec if hint == "SHUFFLE_MERGE" => true
- }.size === 1)
- checkAnswer(joinWithNonEquiDF, Seq(Row(0, 0), Row(0, 6), Row(0, 9),
Row(1, 1),
- Row(1, 7), Row(2, 2), Row(2, 8), Row(3, null), Row(4, null),
Row(null, 3), Row(null, 4),
-@@ -224,6 +233,8 @@ class WholeStageCodegenSuite extends QueryTest with
SharedSparkSession
- assert(twoJoinsDF.queryExecution.executedPlan.collect {
- case WholeStageCodegenExec(_ : ShuffledHashJoinExec) if hint ==
"SHUFFLE_HASH" => true
- case WholeStageCodegenExec(_ : SortMergeJoinExec) if hint ==
"SHUFFLE_MERGE" => true
-+ case _: CometHashJoinExec if hint == "SHUFFLE_HASH" => true
-+ case _: CometSortMergeJoinExec if hint == "SHUFFLE_MERGE" => true
- }.size === 2)
- checkAnswer(twoJoinsDF,
- Seq(Row(0, 0, 0), Row(1, 1, null), Row(2, 2, 2), Row(3, 3, null),
Row(4, 4, null),
-@@ -241,6 +252,7 @@ class WholeStageCodegenSuite extends QueryTest with
SharedSparkSession
- val oneLeftOuterJoinDF = df1.join(df2.hint("SHUFFLE_MERGE"), $"k1" ===
$"k2", "left_outer")
- assert(oneLeftOuterJoinDF.queryExecution.executedPlan.collect {
- case WholeStageCodegenExec(_ : SortMergeJoinExec) => true
-+ case _: CometSortMergeJoinExec => true
- }.size === 1)
- checkAnswer(oneLeftOuterJoinDF, Seq(Row(0, 0), Row(1, 1), Row(2, 2),
Row(3, 3), Row(4, null),
- Row(5, null), Row(6, null), Row(7, null), Row(8, null), Row(9, null)))
-@@ -249,6 +261,7 @@ class WholeStageCodegenSuite extends QueryTest with
SharedSparkSession
- val oneRightOuterJoinDF = df2.join(df3.hint("SHUFFLE_MERGE"), $"k2" ===
$"k3", "right_outer")
- assert(oneRightOuterJoinDF.queryExecution.executedPlan.collect {
- case WholeStageCodegenExec(_ : SortMergeJoinExec) => true
-+ case _: CometSortMergeJoinExec => true
- }.size === 1)
- checkAnswer(oneRightOuterJoinDF, Seq(Row(0, 0), Row(1, 1), Row(2, 2),
Row(3, 3), Row(null, 4),
- Row(null, 5)))
-@@ -258,6 +271,7 @@ class WholeStageCodegenSuite extends QueryTest with
SharedSparkSession
- .join(df1.hint("SHUFFLE_MERGE"), $"k3" === $"k1", "right_outer")
- assert(twoJoinsDF.queryExecution.executedPlan.collect {
- case WholeStageCodegenExec(_ : SortMergeJoinExec) => true
-+ case _: CometSortMergeJoinExec => true
- }.size === 2)
- checkAnswer(twoJoinsDF,
- Seq(Row(0, 0, 0), Row(1, 1, 1), Row(2, 2, 2), Row(3, 3, 3), Row(4,
null, 4), Row(5, null, 5),
-@@ -273,6 +287,7 @@ class WholeStageCodegenSuite extends QueryTest with
SharedSparkSession
- val oneJoinDF = df1.join(df2.hint("SHUFFLE_MERGE"), $"k1" === $"k2",
"left_semi")
- assert(oneJoinDF.queryExecution.executedPlan.collect {
- case WholeStageCodegenExec(ProjectExec(_, _ : SortMergeJoinExec)) =>
true
-+ case _: CometSortMergeJoinExec => true
- }.size === 1)
- checkAnswer(oneJoinDF, Seq(Row(0), Row(1), Row(2), Row(3)))
-
-@@ -280,8 +295,8 @@ class WholeStageCodegenSuite extends QueryTest with
SharedSparkSession
- val twoJoinsDF = df3.join(df2.hint("SHUFFLE_MERGE"), $"k3" === $"k2",
"left_semi")
- .join(df1.hint("SHUFFLE_MERGE"), $"k3" === $"k1", "left_semi")
- assert(twoJoinsDF.queryExecution.executedPlan.collect {
-- case WholeStageCodegenExec(ProjectExec(_, _ : SortMergeJoinExec)) |
-- WholeStageCodegenExec(_ : SortMergeJoinExec) => true
-+ case _: SortMergeJoinExec => true
-+ case _: CometSortMergeJoinExec => true
- }.size === 2)
- checkAnswer(twoJoinsDF, Seq(Row(0), Row(1), Row(2), Row(3)))
- }
-@@ -295,6 +310,7 @@ class WholeStageCodegenSuite extends QueryTest with
SharedSparkSession
- val oneJoinDF = df1.join(df2.hint("SHUFFLE_MERGE"), $"k1" === $"k2",
"left_anti")
- assert(oneJoinDF.queryExecution.executedPlan.collect {
- case WholeStageCodegenExec(ProjectExec(_, _ : SortMergeJoinExec)) =>
true
-+ case _: CometSortMergeJoinExec => true
- }.size === 1)
- checkAnswer(oneJoinDF, Seq(Row(4), Row(5), Row(6), Row(7), Row(8),
Row(9)))
-
-@@ -302,8 +318,8 @@ class WholeStageCodegenSuite extends QueryTest with
SharedSparkSession
- val twoJoinsDF = df1.join(df2.hint("SHUFFLE_MERGE"), $"k1" === $"k2",
"left_anti")
- .join(df3.hint("SHUFFLE_MERGE"), $"k1" === $"k3", "left_anti")
- assert(twoJoinsDF.queryExecution.executedPlan.collect {
-- case WholeStageCodegenExec(ProjectExec(_, _ : SortMergeJoinExec)) |
-- WholeStageCodegenExec(_ : SortMergeJoinExec) => true
-+ case _: SortMergeJoinExec => true
-+ case _: CometSortMergeJoinExec => true
- }.size === 2)
- checkAnswer(twoJoinsDF, Seq(Row(6), Row(7), Row(8), Row(9)))
- }
-@@ -433,10 +449,6 @@ class WholeStageCodegenSuite extends QueryTest with
SharedSparkSession
-
- test("Sort should be included in WholeStageCodegen") {
- val df = spark.range(3, 0, -1).toDF().sort(col("id"))
-- val plan = df.queryExecution.executedPlan
-- assert(plan.exists(p =>
-- p.isInstanceOf[WholeStageCodegenExec] &&
-- p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[SortExec]))
- assert(df.collect() === Array(Row(1), Row(2), Row(3)))
- }
-
-@@ -616,7 +628,9 @@ class WholeStageCodegenSuite extends QueryTest with
SharedSparkSession
- .write.mode(SaveMode.Overwrite).parquet(path)
-
- withSQLConf(SQLConf.WHOLESTAGE_MAX_NUM_FIELDS.key -> "255",
-- SQLConf.WHOLESTAGE_SPLIT_CONSUME_FUNC_BY_OPERATOR.key -> "true") {
-+ SQLConf.WHOLESTAGE_SPLIT_CONSUME_FUNC_BY_OPERATOR.key -> "true",
-+ // Disable Comet native execution because this checks wholestage
codegen.
-+ "spark.comet.exec.enabled" -> "false") {
- val projection = Seq.tabulate(columnNum)(i => s"c$i + c$i as
newC$i")
- val df = spark.read.parquet(path).selectExpr(projection: _*)
+@@ -29,7 +29,7 @@ import org.apache.spark.sql.test.SharedSparkSession
+ import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
+
+ // Disable AQE because the WholeStageCodegenExec is added when running
QueryStageExec
+-class WholeStageCodegenSuite extends QueryTest with SharedSparkSession
++class WholeStageCodegenSuite extends QueryTest with SharedSparkSession with
IgnoreCometSuite
+ with DisableAdaptiveExecutionSuite {
+ import testImplicits._
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
index 593bd7bb4ba..32af28b0238 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
diff --git a/dev/diffs/3.5.4.diff b/dev/diffs/3.5.4.diff
index 8b7f05280..32d4d617e 100644
--- a/dev/diffs/3.5.4.diff
+++ b/dev/diffs/3.5.4.diff
@@ -1476,76 +1476,27 @@ index b14f4a405f6..ab7baf434a5 100644
spark.range(1).foreach { _ =>
columnarToRowExec.canonicalized
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
-index 5a413c77754..a6f97dccb67 100644
+index 5a413c77754..207b66e1d7b 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
-@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution
- import org.apache.spark.sql.{Dataset, QueryTest, Row, SaveMode}
+@@ -17,7 +17,7 @@
+
+ package org.apache.spark.sql.execution
+
+-import org.apache.spark.sql.{Dataset, QueryTest, Row, SaveMode}
++import org.apache.spark.sql.{Dataset, IgnoreCometSuite, QueryTest, Row,
SaveMode}
import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode
import org.apache.spark.sql.catalyst.expressions.codegen.{ByteCodeStats,
CodeAndComment, CodeGenerator}
-+import org.apache.spark.sql.comet.{CometSortExec, CometSortMergeJoinExec}
import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecutionSuite
- import org.apache.spark.sql.execution.aggregate.{HashAggregateExec,
SortAggregateExec}
- import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
-@@ -235,6 +236,7 @@ class WholeStageCodegenSuite extends QueryTest with
SharedSparkSession
- assert(twoJoinsDF.queryExecution.executedPlan.collect {
- case WholeStageCodegenExec(_ : ShuffledHashJoinExec) if hint ==
"SHUFFLE_HASH" => true
- case WholeStageCodegenExec(_ : SortMergeJoinExec) if hint ==
"SHUFFLE_MERGE" => true
-+ case _: CometSortMergeJoinExec if hint == "SHUFFLE_MERGE" => true
- }.size === 2)
- checkAnswer(twoJoinsDF,
- Seq(Row(0, 0, 0), Row(1, 1, null), Row(2, 2, 2), Row(3, 3, null),
Row(4, 4, null),
-@@ -358,6 +360,7 @@ class WholeStageCodegenSuite extends QueryTest with
SharedSparkSession
- .join(df1.hint("SHUFFLE_MERGE"), $"k3" === $"k1", "right_outer")
- assert(twoJoinsDF.queryExecution.executedPlan.collect {
- case WholeStageCodegenExec(_ : SortMergeJoinExec) => true
-+ case _: CometSortMergeJoinExec => true
- }.size === 2)
- checkAnswer(twoJoinsDF,
- Seq(Row(0, 0, 0), Row(1, 1, 1), Row(2, 2, 2), Row(3, 3, 3), Row(4,
null, 4), Row(5, null, 5),
-@@ -380,8 +383,7 @@ class WholeStageCodegenSuite extends QueryTest with
SharedSparkSession
- val twoJoinsDF = df3.join(df2.hint("SHUFFLE_MERGE"), $"k3" === $"k2",
"left_semi")
- .join(df1.hint("SHUFFLE_MERGE"), $"k3" === $"k1", "left_semi")
- assert(twoJoinsDF.queryExecution.executedPlan.collect {
-- case WholeStageCodegenExec(ProjectExec(_, _ : SortMergeJoinExec)) |
-- WholeStageCodegenExec(_ : SortMergeJoinExec) => true
-+ case _: SortMergeJoinExec => true
- }.size === 2)
- checkAnswer(twoJoinsDF, Seq(Row(0), Row(1), Row(2), Row(3)))
- }
-@@ -402,8 +404,7 @@ class WholeStageCodegenSuite extends QueryTest with
SharedSparkSession
- val twoJoinsDF = df1.join(df2.hint("SHUFFLE_MERGE"), $"k1" === $"k2",
"left_anti")
- .join(df3.hint("SHUFFLE_MERGE"), $"k1" === $"k3", "left_anti")
- assert(twoJoinsDF.queryExecution.executedPlan.collect {
-- case WholeStageCodegenExec(ProjectExec(_, _ : SortMergeJoinExec)) |
-- WholeStageCodegenExec(_ : SortMergeJoinExec) => true
-+ case _: SortMergeJoinExec => true
- }.size === 2)
- checkAnswer(twoJoinsDF, Seq(Row(6), Row(7), Row(8), Row(9)))
- }
-@@ -536,7 +537,10 @@ class WholeStageCodegenSuite extends QueryTest with
SharedSparkSession
- val plan = df.queryExecution.executedPlan
- assert(plan.exists(p =>
- p.isInstanceOf[WholeStageCodegenExec] &&
-- p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[SortExec]))
-+ p.asInstanceOf[WholeStageCodegenExec].collect {
-+ case _: SortExec => true
-+ case _: CometSortExec => true
-+ }.nonEmpty))
- assert(df.collect() === Array(Row(1), Row(2), Row(3)))
- }
-
-@@ -716,7 +720,9 @@ class WholeStageCodegenSuite extends QueryTest with
SharedSparkSession
- .write.mode(SaveMode.Overwrite).parquet(path)
-
- withSQLConf(SQLConf.WHOLESTAGE_MAX_NUM_FIELDS.key -> "255",
-- SQLConf.WHOLESTAGE_SPLIT_CONSUME_FUNC_BY_OPERATOR.key -> "true") {
-+ SQLConf.WHOLESTAGE_SPLIT_CONSUME_FUNC_BY_OPERATOR.key -> "true",
-+ // Disable Comet native execution because this checks wholestage
codegen.
-+ "spark.comet.exec.enabled" -> "false") {
- val projection = Seq.tabulate(columnNum)(i => s"c$i + c$i as
newC$i")
- val df = spark.read.parquet(path).selectExpr(projection: _*)
+@@ -30,7 +30,7 @@ import org.apache.spark.sql.test.SharedSparkSession
+ import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
+
+ // Disable AQE because the WholeStageCodegenExec is added when running
QueryStageExec
+-class WholeStageCodegenSuite extends QueryTest with SharedSparkSession
++class WholeStageCodegenSuite extends QueryTest with SharedSparkSession with
IgnoreCometSuite
+ with DisableAdaptiveExecutionSuite {
+ import testImplicits._
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
index 2f8e401e743..a4f94417dcc 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
diff --git a/dev/diffs/3.5.5.diff b/dev/diffs/3.5.5.diff
index 26a51f6c4..9ca531087 100644
--- a/dev/diffs/3.5.5.diff
+++ b/dev/diffs/3.5.5.diff
@@ -1191,15 +1191,13 @@ index de24b8c82b0..1f835481290 100644
setupTestData()
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantProjectsSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantProjectsSuite.scala
-index 9e9d717db3b..c1a7caf56e0 100644
+index 9e9d717db3b..91a4f9a38d5 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantProjectsSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantProjectsSuite.scala
-@@ -17,7 +17,8 @@
-
+@@ -18,6 +18,7 @@
package org.apache.spark.sql.execution
--import org.apache.spark.sql.{DataFrame, QueryTest, Row}
-+import org.apache.spark.sql.{DataFrame, QueryTest, Row}
+ import org.apache.spark.sql.{DataFrame, QueryTest, Row}
+import org.apache.spark.sql.comet.CometProjectExec
import org.apache.spark.sql.connector.SimpleWritableDataSource
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper,
DisableAdaptiveExecutionSuite, EnableAdaptiveExecutionSuite}
@@ -1216,15 +1214,6 @@ index 9e9d717db3b..c1a7caf56e0 100644
assert(actual == expected)
}
}
-@@ -112,7 +116,7 @@ abstract class RemoveRedundantProjectsSuiteBase
- assertProjectExec(query, 1, 3)
- }
-
-- test("join with ordering requirement") {
-+ test("join with ordering requirement") {
- val query = "select * from (select key, a, c, b from testView) as t1 join
" +
- "(select key, a, b, c from testView) as t2 on t1.key = t2.key where
t2.a > 50"
- assertProjectExec(query, 2, 2)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantSortsSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantSortsSuite.scala
index 005e764cc30..92ec088efab 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantSortsSuite.scala
@@ -1292,76 +1281,27 @@ index b14f4a405f6..ab7baf434a5 100644
spark.range(1).foreach { _ =>
columnarToRowExec.canonicalized
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
-index 5a413c77754..a6f97dccb67 100644
+index 5a413c77754..207b66e1d7b 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
-@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution
- import org.apache.spark.sql.{Dataset, QueryTest, Row, SaveMode}
+@@ -17,7 +17,7 @@
+
+ package org.apache.spark.sql.execution
+
+-import org.apache.spark.sql.{Dataset, QueryTest, Row, SaveMode}
++import org.apache.spark.sql.{Dataset, IgnoreCometSuite, QueryTest, Row,
SaveMode}
import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode
import org.apache.spark.sql.catalyst.expressions.codegen.{ByteCodeStats,
CodeAndComment, CodeGenerator}
-+import org.apache.spark.sql.comet.{CometSortExec, CometSortMergeJoinExec}
import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecutionSuite
- import org.apache.spark.sql.execution.aggregate.{HashAggregateExec,
SortAggregateExec}
- import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
-@@ -235,6 +236,7 @@ class WholeStageCodegenSuite extends QueryTest with
SharedSparkSession
- assert(twoJoinsDF.queryExecution.executedPlan.collect {
- case WholeStageCodegenExec(_ : ShuffledHashJoinExec) if hint ==
"SHUFFLE_HASH" => true
- case WholeStageCodegenExec(_ : SortMergeJoinExec) if hint ==
"SHUFFLE_MERGE" => true
-+ case _: CometSortMergeJoinExec if hint == "SHUFFLE_MERGE" => true
- }.size === 2)
- checkAnswer(twoJoinsDF,
- Seq(Row(0, 0, 0), Row(1, 1, null), Row(2, 2, 2), Row(3, 3, null),
Row(4, 4, null),
-@@ -358,6 +360,7 @@ class WholeStageCodegenSuite extends QueryTest with
SharedSparkSession
- .join(df1.hint("SHUFFLE_MERGE"), $"k3" === $"k1", "right_outer")
- assert(twoJoinsDF.queryExecution.executedPlan.collect {
- case WholeStageCodegenExec(_ : SortMergeJoinExec) => true
-+ case _: CometSortMergeJoinExec => true
- }.size === 2)
- checkAnswer(twoJoinsDF,
- Seq(Row(0, 0, 0), Row(1, 1, 1), Row(2, 2, 2), Row(3, 3, 3), Row(4,
null, 4), Row(5, null, 5),
-@@ -380,8 +383,7 @@ class WholeStageCodegenSuite extends QueryTest with
SharedSparkSession
- val twoJoinsDF = df3.join(df2.hint("SHUFFLE_MERGE"), $"k3" === $"k2",
"left_semi")
- .join(df1.hint("SHUFFLE_MERGE"), $"k3" === $"k1", "left_semi")
- assert(twoJoinsDF.queryExecution.executedPlan.collect {
-- case WholeStageCodegenExec(ProjectExec(_, _ : SortMergeJoinExec)) |
-- WholeStageCodegenExec(_ : SortMergeJoinExec) => true
-+ case _: SortMergeJoinExec => true
- }.size === 2)
- checkAnswer(twoJoinsDF, Seq(Row(0), Row(1), Row(2), Row(3)))
- }
-@@ -402,8 +404,7 @@ class WholeStageCodegenSuite extends QueryTest with
SharedSparkSession
- val twoJoinsDF = df1.join(df2.hint("SHUFFLE_MERGE"), $"k1" === $"k2",
"left_anti")
- .join(df3.hint("SHUFFLE_MERGE"), $"k1" === $"k3", "left_anti")
- assert(twoJoinsDF.queryExecution.executedPlan.collect {
-- case WholeStageCodegenExec(ProjectExec(_, _ : SortMergeJoinExec)) |
-- WholeStageCodegenExec(_ : SortMergeJoinExec) => true
-+ case _: SortMergeJoinExec => true
- }.size === 2)
- checkAnswer(twoJoinsDF, Seq(Row(6), Row(7), Row(8), Row(9)))
- }
-@@ -536,7 +537,10 @@ class WholeStageCodegenSuite extends QueryTest with
SharedSparkSession
- val plan = df.queryExecution.executedPlan
- assert(plan.exists(p =>
- p.isInstanceOf[WholeStageCodegenExec] &&
-- p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[SortExec]))
-+ p.asInstanceOf[WholeStageCodegenExec].collect {
-+ case _: SortExec => true
-+ case _: CometSortExec => true
-+ }.nonEmpty))
- assert(df.collect() === Array(Row(1), Row(2), Row(3)))
- }
-
-@@ -716,7 +720,9 @@ class WholeStageCodegenSuite extends QueryTest with
SharedSparkSession
- .write.mode(SaveMode.Overwrite).parquet(path)
-
- withSQLConf(SQLConf.WHOLESTAGE_MAX_NUM_FIELDS.key -> "255",
-- SQLConf.WHOLESTAGE_SPLIT_CONSUME_FUNC_BY_OPERATOR.key -> "true") {
-+ SQLConf.WHOLESTAGE_SPLIT_CONSUME_FUNC_BY_OPERATOR.key -> "true",
-+ // Disable Comet native execution because this checks wholestage
codegen.
-+ "spark.comet.exec.enabled" -> "false") {
- val projection = Seq.tabulate(columnNum)(i => s"c$i + c$i as
newC$i")
- val df = spark.read.parquet(path).selectExpr(projection: _*)
+@@ -30,7 +30,7 @@ import org.apache.spark.sql.test.SharedSparkSession
+ import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
+
+ // Disable AQE because the WholeStageCodegenExec is added when running
QueryStageExec
+-class WholeStageCodegenSuite extends QueryTest with SharedSparkSession
++class WholeStageCodegenSuite extends QueryTest with SharedSparkSession with
IgnoreCometSuite
+ with DisableAdaptiveExecutionSuite {
+ import testImplicits._
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
index 2f8e401e743..a4f94417dcc 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
diff --git a/dev/diffs/4.0.0-preview1.diff b/dev/diffs/4.0.0-preview1.diff
index 99cbd0325..2fec4297d 100644
--- a/dev/diffs/4.0.0-preview1.diff
+++ b/dev/diffs/4.0.0-preview1.diff
@@ -1476,226 +1476,27 @@ index 966f4e74712..8017e22d7f8 100644
spark.range(1).foreach { _ =>
columnarToRowExec.canonicalized
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
-index 3aaf61ffba4..4130ece2283 100644
+index 3aaf61ffba4..b9dd7090799 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala
-@@ -22,6 +22,7 @@ import org.apache.spark.rdd.MapPartitionsWithEvaluatorRDD
- import org.apache.spark.sql.{Dataset, QueryTest, Row, SaveMode}
+@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution
+
+ import org.apache.spark.SparkException
+ import org.apache.spark.rdd.MapPartitionsWithEvaluatorRDD
+-import org.apache.spark.sql.{Dataset, QueryTest, Row, SaveMode}
++import org.apache.spark.sql.{Dataset, IgnoreCometSuite, QueryTest, Row,
SaveMode}
import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode
import org.apache.spark.sql.catalyst.expressions.codegen.{ByteCodeStats,
CodeAndComment, CodeGenerator}
-+import org.apache.spark.sql.comet.{CometHashJoinExec, CometSortExec,
CometSortMergeJoinExec}
import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecutionSuite
- import org.apache.spark.sql.execution.aggregate.{HashAggregateExec,
SortAggregateExec}
- import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
-@@ -172,6 +173,7 @@ class WholeStageCodegenSuite extends QueryTest with
SharedSparkSession
- val oneJoinDF = df1.join(df2.hint("SHUFFLE_HASH"), $"k1" === $"k2")
- assert(oneJoinDF.queryExecution.executedPlan.collect {
- case WholeStageCodegenExec(_ : ShuffledHashJoinExec) => true
-+ case _: CometHashJoinExec => true
- }.size === 1)
- checkAnswer(oneJoinDF, Seq(Row(0, 0), Row(1, 1), Row(2, 2), Row(3, 3),
Row(4, 4)))
-
-@@ -180,6 +182,7 @@ class WholeStageCodegenSuite extends QueryTest with
SharedSparkSession
- .join(df3.hint("SHUFFLE_HASH"), $"k1" === $"k3")
- assert(twoJoinsDF.queryExecution.executedPlan.collect {
- case WholeStageCodegenExec(_ : ShuffledHashJoinExec) => true
-+ case _: CometHashJoinExec => true
- }.size === 2)
- checkAnswer(twoJoinsDF,
- Seq(Row(0, 0, 0), Row(1, 1, 1), Row(2, 2, 2), Row(3, 3, 3), Row(4, 4,
4)))
-@@ -206,6 +209,8 @@ class WholeStageCodegenSuite extends QueryTest with
SharedSparkSession
- assert(joinUniqueDF.queryExecution.executedPlan.collect {
- case WholeStageCodegenExec(_ : ShuffledHashJoinExec) if hint ==
"SHUFFLE_HASH" => true
- case WholeStageCodegenExec(_ : SortMergeJoinExec) if hint ==
"SHUFFLE_MERGE" => true
-+ case _: CometHashJoinExec if hint == "SHUFFLE_HASH" => true
-+ case _: CometSortMergeJoinExec if hint == "SHUFFLE_MERGE" => true
- }.size === 1)
- checkAnswer(joinUniqueDF, Seq(Row(0, 0), Row(1, 1), Row(2, 2), Row(3,
3), Row(4, 4),
- Row(null, 5), Row(null, 6), Row(null, 7), Row(null, 8), Row(null, 9)))
-@@ -216,6 +221,8 @@ class WholeStageCodegenSuite extends QueryTest with
SharedSparkSession
- assert(joinNonUniqueDF.queryExecution.executedPlan.collect {
- case WholeStageCodegenExec(_ : ShuffledHashJoinExec) if hint ==
"SHUFFLE_HASH" => true
- case WholeStageCodegenExec(_ : SortMergeJoinExec) if hint ==
"SHUFFLE_MERGE" => true
-+ case _: CometHashJoinExec if hint == "SHUFFLE_HASH" => true
-+ case _: CometSortMergeJoinExec if hint == "SHUFFLE_MERGE" => true
- }.size === 1)
- checkAnswer(joinNonUniqueDF, Seq(Row(0, 0), Row(0, 3), Row(0, 6),
Row(0, 9), Row(1, 1),
- Row(1, 4), Row(1, 7), Row(2, 2), Row(2, 5), Row(2, 8), Row(3, null),
Row(4, null)))
-@@ -226,6 +233,8 @@ class WholeStageCodegenSuite extends QueryTest with
SharedSparkSession
- assert(joinWithNonEquiDF.queryExecution.executedPlan.collect {
- case WholeStageCodegenExec(_ : ShuffledHashJoinExec) if hint ==
"SHUFFLE_HASH" => true
- case WholeStageCodegenExec(_ : SortMergeJoinExec) if hint ==
"SHUFFLE_MERGE" => true
-+ case _: CometHashJoinExec if hint == "SHUFFLE_HASH" => true
-+ case _: CometSortMergeJoinExec if hint == "SHUFFLE_MERGE" => true
- }.size === 1)
- checkAnswer(joinWithNonEquiDF, Seq(Row(0, 0), Row(0, 6), Row(0, 9),
Row(1, 1),
- Row(1, 7), Row(2, 2), Row(2, 8), Row(3, null), Row(4, null),
Row(null, 3), Row(null, 4),
-@@ -237,6 +246,8 @@ class WholeStageCodegenSuite extends QueryTest with
SharedSparkSession
- assert(twoJoinsDF.queryExecution.executedPlan.collect {
- case WholeStageCodegenExec(_ : ShuffledHashJoinExec) if hint ==
"SHUFFLE_HASH" => true
- case WholeStageCodegenExec(_ : SortMergeJoinExec) if hint ==
"SHUFFLE_MERGE" => true
-+ case _: CometHashJoinExec if hint == "SHUFFLE_HASH" => true
-+ case _: CometSortMergeJoinExec if hint == "SHUFFLE_MERGE" => true
- }.size === 2)
- checkAnswer(twoJoinsDF,
- Seq(Row(0, 0, 0), Row(1, 1, null), Row(2, 2, 2), Row(3, 3, null),
Row(4, 4, null),
-@@ -258,6 +269,8 @@ class WholeStageCodegenSuite extends QueryTest with
SharedSparkSession
- assert(rightJoinUniqueDf.queryExecution.executedPlan.collect {
- case WholeStageCodegenExec(_: ShuffledHashJoinExec) if hint ==
"SHUFFLE_HASH" => true
- case WholeStageCodegenExec(_: SortMergeJoinExec) if hint ==
"SHUFFLE_MERGE" => true
-+ case _: CometHashJoinExec if hint == "SHUFFLE_HASH" => true
-+ case _: CometSortMergeJoinExec if hint == "SHUFFLE_MERGE" => true
- }.size === 1)
- checkAnswer(rightJoinUniqueDf, Seq(Row(1, 1), Row(2, 2), Row(3, 3),
Row(4, 4),
- Row(null, 5), Row(null, 6), Row(null, 7), Row(null, 8), Row(null,
9),
-@@ -269,6 +282,8 @@ class WholeStageCodegenSuite extends QueryTest with
SharedSparkSession
- assert(leftJoinUniqueDf.queryExecution.executedPlan.collect {
- case WholeStageCodegenExec(_: ShuffledHashJoinExec) if hint ==
"SHUFFLE_HASH" => true
- case WholeStageCodegenExec(_: SortMergeJoinExec) if hint ==
"SHUFFLE_MERGE" => true
-+ case _: CometHashJoinExec if hint == "SHUFFLE_HASH" => true
-+ case _: CometSortMergeJoinExec if hint == "SHUFFLE_MERGE" => true
- }.size === 1)
- checkAnswer(leftJoinUniqueDf, Seq(Row(0, null), Row(1, 1), Row(2, 2),
Row(3, 3), Row(4, 4)))
- assert(leftJoinUniqueDf.count() === 5)
-@@ -278,6 +293,8 @@ class WholeStageCodegenSuite extends QueryTest with
SharedSparkSession
- assert(rightJoinNonUniqueDf.queryExecution.executedPlan.collect {
- case WholeStageCodegenExec(_: ShuffledHashJoinExec) if hint ==
"SHUFFLE_HASH" => true
- case WholeStageCodegenExec(_: SortMergeJoinExec) if hint ==
"SHUFFLE_MERGE" => true
-+ case _: CometHashJoinExec if hint == "SHUFFLE_HASH" => true
-+ case _: CometSortMergeJoinExec if hint == "SHUFFLE_MERGE" => true
- }.size === 1)
- checkAnswer(rightJoinNonUniqueDf, Seq(Row(0, 3), Row(0, 6), Row(0,
9), Row(1, 1),
- Row(1, 4), Row(1, 7), Row(1, 10), Row(2, 2), Row(2, 5), Row(2, 8)))
-@@ -287,6 +304,8 @@ class WholeStageCodegenSuite extends QueryTest with
SharedSparkSession
- assert(leftJoinNonUniqueDf.queryExecution.executedPlan.collect {
- case WholeStageCodegenExec(_: ShuffledHashJoinExec) if hint ==
"SHUFFLE_HASH" => true
- case WholeStageCodegenExec(_: SortMergeJoinExec) if hint ==
"SHUFFLE_MERGE" => true
-+ case _: CometHashJoinExec if hint == "SHUFFLE_HASH" => true
-+ case _: CometSortMergeJoinExec if hint == "SHUFFLE_MERGE" => true
- }.size === 1)
- checkAnswer(leftJoinNonUniqueDf, Seq(Row(0, 3), Row(0, 6), Row(0, 9),
Row(1, 1),
- Row(1, 4), Row(1, 7), Row(1, 10), Row(2, 2), Row(2, 5), Row(2, 8),
Row(3, null),
-@@ -298,6 +317,8 @@ class WholeStageCodegenSuite extends QueryTest with
SharedSparkSession
- assert(rightJoinWithNonEquiDf.queryExecution.executedPlan.collect {
- case WholeStageCodegenExec(_: ShuffledHashJoinExec) if hint ==
"SHUFFLE_HASH" => true
- case WholeStageCodegenExec(_: SortMergeJoinExec) if hint ==
"SHUFFLE_MERGE" => true
-+ case _: CometHashJoinExec if hint == "SHUFFLE_HASH" => true
-+ case _: CometSortMergeJoinExec if hint == "SHUFFLE_MERGE" => true
- }.size === 1)
- checkAnswer(rightJoinWithNonEquiDf, Seq(Row(0, 6), Row(0, 9), Row(1,
1), Row(1, 7),
- Row(1, 10), Row(2, 2), Row(2, 8), Row(null, 3), Row(null, 4),
Row(null, 5)))
-@@ -308,6 +329,8 @@ class WholeStageCodegenSuite extends QueryTest with
SharedSparkSession
- assert(leftJoinWithNonEquiDf.queryExecution.executedPlan.collect {
- case WholeStageCodegenExec(_: ShuffledHashJoinExec) if hint ==
"SHUFFLE_HASH" => true
- case WholeStageCodegenExec(_: SortMergeJoinExec) if hint ==
"SHUFFLE_MERGE" => true
-+ case _: CometHashJoinExec if hint == "SHUFFLE_HASH" => true
-+ case _: CometSortMergeJoinExec if hint == "SHUFFLE_MERGE" => true
- }.size === 1)
- checkAnswer(leftJoinWithNonEquiDf, Seq(Row(0, 6), Row(0, 9), Row(1,
1), Row(1, 7),
- Row(1, 10), Row(2, 2), Row(2, 8), Row(3, null), Row(4, null)))
-@@ -318,6 +341,8 @@ class WholeStageCodegenSuite extends QueryTest with
SharedSparkSession
- assert(twoRightJoinsDf.queryExecution.executedPlan.collect {
- case WholeStageCodegenExec(_: ShuffledHashJoinExec) if hint ==
"SHUFFLE_HASH" => true
- case WholeStageCodegenExec(_: SortMergeJoinExec) if hint ==
"SHUFFLE_MERGE" => true
-+ case _: CometHashJoinExec if hint == "SHUFFLE_HASH" => true
-+ case _: CometSortMergeJoinExec if hint == "SHUFFLE_MERGE" => true
- }.size === 2)
- checkAnswer(twoRightJoinsDf, Seq(Row(2, 2, 2), Row(3, 3, 3), Row(4,
4, 4)))
-
-@@ -327,6 +352,8 @@ class WholeStageCodegenSuite extends QueryTest with
SharedSparkSession
- assert(twoLeftJoinsDf.queryExecution.executedPlan.collect {
- case WholeStageCodegenExec(_: ShuffledHashJoinExec) if hint ==
"SHUFFLE_HASH" => true
- case WholeStageCodegenExec(_: SortMergeJoinExec) if hint ==
"SHUFFLE_MERGE" => true
-+ case _: CometHashJoinExec if hint == "SHUFFLE_HASH" => true
-+ case _: CometSortMergeJoinExec if hint == "SHUFFLE_MERGE" => true
- }.size === 2)
- checkAnswer(twoLeftJoinsDf,
- Seq(Row(0, null, null), Row(1, 1, null), Row(2, 2, 2), Row(3, 3,
3), Row(4, 4, 4)))
-@@ -343,6 +370,7 @@ class WholeStageCodegenSuite extends QueryTest with
SharedSparkSession
- val oneLeftOuterJoinDF = df1.join(df2.hint("SHUFFLE_MERGE"), $"k1" ===
$"k2", "left_outer")
- assert(oneLeftOuterJoinDF.queryExecution.executedPlan.collect {
- case WholeStageCodegenExec(_ : SortMergeJoinExec) => true
-+ case _: CometSortMergeJoinExec => true
- }.size === 1)
- checkAnswer(oneLeftOuterJoinDF, Seq(Row(0, 0), Row(1, 1), Row(2, 2),
Row(3, 3), Row(4, null),
- Row(5, null), Row(6, null), Row(7, null), Row(8, null), Row(9, null)))
-@@ -351,6 +379,7 @@ class WholeStageCodegenSuite extends QueryTest with
SharedSparkSession
- val oneRightOuterJoinDF = df2.join(df3.hint("SHUFFLE_MERGE"), $"k2" ===
$"k3", "right_outer")
- assert(oneRightOuterJoinDF.queryExecution.executedPlan.collect {
- case WholeStageCodegenExec(_ : SortMergeJoinExec) => true
-+ case _: CometSortMergeJoinExec => true
- }.size === 1)
- checkAnswer(oneRightOuterJoinDF, Seq(Row(0, 0), Row(1, 1), Row(2, 2),
Row(3, 3), Row(null, 4),
- Row(null, 5)))
-@@ -360,6 +389,7 @@ class WholeStageCodegenSuite extends QueryTest with
SharedSparkSession
- .join(df1.hint("SHUFFLE_MERGE"), $"k3" === $"k1", "right_outer")
- assert(twoJoinsDF.queryExecution.executedPlan.collect {
- case WholeStageCodegenExec(_ : SortMergeJoinExec) => true
-+ case _: CometSortMergeJoinExec => true
- }.size === 2)
- checkAnswer(twoJoinsDF,
- Seq(Row(0, 0, 0), Row(1, 1, 1), Row(2, 2, 2), Row(3, 3, 3), Row(4,
null, 4), Row(5, null, 5),
-@@ -375,6 +405,7 @@ class WholeStageCodegenSuite extends QueryTest with
SharedSparkSession
- val oneJoinDF = df1.join(df2.hint("SHUFFLE_MERGE"), $"k1" === $"k2",
"left_semi")
- assert(oneJoinDF.queryExecution.executedPlan.collect {
- case WholeStageCodegenExec(ProjectExec(_, _ : SortMergeJoinExec)) =>
true
-+ case _: CometSortMergeJoinExec => true
- }.size === 1)
- checkAnswer(oneJoinDF, Seq(Row(0), Row(1), Row(2), Row(3)))
-
-@@ -382,8 +413,8 @@ class WholeStageCodegenSuite extends QueryTest with
SharedSparkSession
- val twoJoinsDF = df3.join(df2.hint("SHUFFLE_MERGE"), $"k3" === $"k2",
"left_semi")
- .join(df1.hint("SHUFFLE_MERGE"), $"k3" === $"k1", "left_semi")
- assert(twoJoinsDF.queryExecution.executedPlan.collect {
-- case WholeStageCodegenExec(ProjectExec(_, _ : SortMergeJoinExec)) |
-- WholeStageCodegenExec(_ : SortMergeJoinExec) => true
-+ case _: SortMergeJoinExec => true
-+ case _: CometSortMergeJoinExec => true
- }.size === 2)
- checkAnswer(twoJoinsDF, Seq(Row(0), Row(1), Row(2), Row(3)))
- }
-@@ -397,6 +428,7 @@ class WholeStageCodegenSuite extends QueryTest with
SharedSparkSession
- val oneJoinDF = df1.join(df2.hint("SHUFFLE_MERGE"), $"k1" === $"k2",
"left_anti")
- assert(oneJoinDF.queryExecution.executedPlan.collect {
- case WholeStageCodegenExec(ProjectExec(_, _ : SortMergeJoinExec)) =>
true
-+ case _: CometSortMergeJoinExec => true
- }.size === 1)
- checkAnswer(oneJoinDF, Seq(Row(4), Row(5), Row(6), Row(7), Row(8),
Row(9)))
-
-@@ -404,8 +436,8 @@ class WholeStageCodegenSuite extends QueryTest with
SharedSparkSession
- val twoJoinsDF = df1.join(df2.hint("SHUFFLE_MERGE"), $"k1" === $"k2",
"left_anti")
- .join(df3.hint("SHUFFLE_MERGE"), $"k1" === $"k3", "left_anti")
- assert(twoJoinsDF.queryExecution.executedPlan.collect {
-- case WholeStageCodegenExec(ProjectExec(_, _ : SortMergeJoinExec)) |
-- WholeStageCodegenExec(_ : SortMergeJoinExec) => true
-+ case _: SortMergeJoinExec => true
-+ case _: CometSortMergeJoinExec => true
- }.size === 2)
- checkAnswer(twoJoinsDF, Seq(Row(6), Row(7), Row(8), Row(9)))
- }
-@@ -538,7 +570,10 @@ class WholeStageCodegenSuite extends QueryTest with
SharedSparkSession
- val plan = df.queryExecution.executedPlan
- assert(plan.exists(p =>
- p.isInstanceOf[WholeStageCodegenExec] &&
-- p.asInstanceOf[WholeStageCodegenExec].child.isInstanceOf[SortExec]))
-+ p.asInstanceOf[WholeStageCodegenExec].collect {
-+ case _: SortExec => true
-+ case _: CometSortExec => true
-+ }.nonEmpty))
- assert(df.collect() === Array(Row(1), Row(2), Row(3)))
- }
-
-@@ -718,7 +753,9 @@ class WholeStageCodegenSuite extends QueryTest with
SharedSparkSession
- .write.mode(SaveMode.Overwrite).parquet(path)
-
- withSQLConf(SQLConf.WHOLESTAGE_MAX_NUM_FIELDS.key -> "255",
-- SQLConf.WHOLESTAGE_SPLIT_CONSUME_FUNC_BY_OPERATOR.key -> "true") {
-+ SQLConf.WHOLESTAGE_SPLIT_CONSUME_FUNC_BY_OPERATOR.key -> "true",
-+ // Disable Comet native execution because this checks wholestage
codegen.
-+ "spark.comet.exec.enabled" -> "false") {
- val projection = Seq.tabulate(columnNum)(i => s"c$i + c$i as
newC$i")
- val df = spark.read.parquet(path).selectExpr(projection: _*)
+@@ -32,7 +32,7 @@ import org.apache.spark.sql.test.SharedSparkSession
+ import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
+
+ // Disable AQE because the WholeStageCodegenExec is added when running
QueryStageExec
+-class WholeStageCodegenSuite extends QueryTest with SharedSparkSession
++class WholeStageCodegenSuite extends QueryTest with SharedSparkSession with
IgnoreCometSuite
+ with DisableAdaptiveExecutionSuite {
+ import testImplicits._
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
index a7efd0aa75e..baae0967a2a 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]