This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new 068b900eb fix: Remove COMET_SHUFFLE_FALLBACK_TO_COLUMNAR hack (#1865)
068b900eb is described below
commit 068b900ebab231ddf2d7168654aa53962f3e7bb9
Author: Andy Grove <[email protected]>
AuthorDate: Mon Jun 9 16:01:29 2025 -0600
fix: Remove COMET_SHUFFLE_FALLBACK_TO_COLUMNAR hack (#1865)
---
.../main/scala/org/apache/comet/CometConf.scala | 7 --
dev/diffs/3.4.3.diff | 119 ++++++++++++++----
dev/diffs/3.5.4.diff | 107 +++++++++++++----
dev/diffs/3.5.5.diff | 133 +++++++++++++++++----
dev/diffs/3.5.6.diff | 133 +++++++++++++++++----
dev/diffs/4.0.0-preview1.diff | 121 +++++++++++++++----
.../org/apache/comet/rules/CometExecRule.scala | 13 +-
.../rules/EliminateRedundantTransitions.scala | 7 +-
.../spark/sql/comet/CometColumnarToRowExec.scala | 2 +
.../apache/spark/sql/CometTPCDSQuerySuite.scala | 1 +
.../scala/org/apache/spark/sql/CometTestBase.scala | 1 -
.../spark/sql/comet/CometPlanStabilitySuite.scala | 1 -
12 files changed, 509 insertions(+), 136 deletions(-)
diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala
b/common/src/main/scala/org/apache/comet/CometConf.scala
index 9807ebe04..317303eb7 100644
--- a/common/src/main/scala/org/apache/comet/CometConf.scala
+++ b/common/src/main/scala/org/apache/comet/CometConf.scala
@@ -289,13 +289,6 @@ object CometConf extends ShimCometConf {
.checkValues(Set("native", "jvm", "auto"))
.createWithDefault("auto")
- val COMET_SHUFFLE_FALLBACK_TO_COLUMNAR: ConfigEntry[Boolean] =
- conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.fallbackToColumnar")
- .doc("Whether to try falling back to columnar shuffle when native
shuffle is not supported")
- .internal()
- .booleanConf
- .createWithDefault(false)
-
val COMET_EXEC_BROADCAST_FORCE_ENABLED: ConfigEntry[Boolean] =
conf(s"$COMET_EXEC_CONFIG_PREFIX.broadcast.enabled")
.doc(
diff --git a/dev/diffs/3.4.3.diff b/dev/diffs/3.4.3.diff
index 18e91d9da..dda075f02 100644
--- a/dev/diffs/3.4.3.diff
+++ b/dev/diffs/3.4.3.diff
@@ -247,7 +247,7 @@ index cf40e944c09..bdd5be4f462 100644
test("A cached table preserves the partitioning and ordering of its cached
SparkPlan") {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
-index 1cc09c3d7fc..b85b53a9688 100644
+index 1cc09c3d7fc..f031fa45c33 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
@@ -27,7 +27,7 @@ import org.apache.spark.SparkException
@@ -268,16 +268,6 @@ index 1cc09c3d7fc..b85b53a9688 100644
}
assert(exchangePlans.length == 1)
}
-@@ -1100,7 +1100,8 @@ class DataFrameAggregateSuite extends QueryTest
- }
- }
-
-- test("SPARK-32038: NormalizeFloatingNumbers should work on distinct
aggregate") {
-+ test("SPARK-32038: NormalizeFloatingNumbers should work on distinct
aggregate",
-+ IgnoreComet("TODO:
https://github.com/apache/datafusion-comet/issues/1824")) {
- withTempView("view") {
- val nan1 = java.lang.Float.intBitsToFloat(0x7f800001)
- val nan2 = java.lang.Float.intBitsToFloat(0x7fffffff)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
index 56e9520fdab..917932336df 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
@@ -363,6 +353,44 @@ index a9f69ab28a1..5d9d4f2cb83 100644
withTable("tbl") {
sql(
"""
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
+index 433b4741979..07148eee480 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
+@@ -23,8 +23,9 @@ import org.apache.spark.TestUtils.{assertNotSpilled,
assertSpilled}
+ import org.apache.spark.sql.catalyst.expressions.{AttributeReference,
Expression, Lag, Literal, NonFoldableLiteral}
+ import org.apache.spark.sql.catalyst.optimizer.TransposeWindow
+ import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
++import org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec
+ import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+-import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS,
Exchange, ShuffleExchangeExec}
++import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS,
Exchange, ShuffleExchangeExec, ShuffleExchangeLike}
+ import org.apache.spark.sql.execution.window.WindowExec
+ import org.apache.spark.sql.expressions.{Aggregator,
MutableAggregationBuffer, UserDefinedAggregateFunction, Window}
+ import org.apache.spark.sql.functions._
+@@ -1186,10 +1187,12 @@ class DataFrameWindowFunctionsSuite extends QueryTest
+ }
+
+ def isShuffleExecByRequirement(
+- plan: ShuffleExchangeExec,
++ plan: ShuffleExchangeLike,
+ desiredClusterColumns: Seq[String]): Boolean = plan match {
+ case ShuffleExchangeExec(op: HashPartitioning, _, ENSURE_REQUIREMENTS)
=>
+ partitionExpressionsColumns(op.expressions) === desiredClusterColumns
++ case CometShuffleExchangeExec(op: HashPartitioning, _, _,
ENSURE_REQUIREMENTS, _, _) =>
++ partitionExpressionsColumns(op.expressions) === desiredClusterColumns
+ case _ => false
+ }
+
+@@ -1212,7 +1215,7 @@ class DataFrameWindowFunctionsSuite extends QueryTest
+ val shuffleByRequirement = windowed.queryExecution.executedPlan.exists {
+ case w: WindowExec =>
+ w.child.exists {
+- case s: ShuffleExchangeExec => isShuffleExecByRequirement(s,
Seq("key1", "key2"))
++ case s: ShuffleExchangeLike => isShuffleExecByRequirement(s,
Seq("key1", "key2"))
+ case _ => false
+ }
+ case _ => false
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index daef11ae4d6..9f3cc9181f2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -386,7 +414,7 @@ index daef11ae4d6..9f3cc9181f2 100644
assert(exchanges.size == 2)
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
-index f33432ddb6f..cc5224af735 100644
+index f33432ddb6f..1925aac8d97 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
@@ -22,6 +22,7 @@ import org.scalatest.GivenWhenThen
@@ -417,17 +445,37 @@ index f33432ddb6f..cc5224af735 100644
Given("disable broadcast pruning and disable subquery duplication")
withSQLConf(
SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true",
-@@ -1215,7 +1220,8 @@ abstract class DynamicPartitionPruningSuiteBase
+@@ -1027,7 +1032,8 @@ abstract class DynamicPartitionPruningSuiteBase
+ }
+ }
+
+- test("avoid reordering broadcast join keys to match input hash
partitioning") {
++ test("avoid reordering broadcast join keys to match input hash
partitioning",
++ IgnoreComet("TODO:
https://github.com/apache/datafusion-comet/issues/1839")) {
+ withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key ->
"false",
+ SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
+ withTable("large", "dimTwo", "dimThree") {
+@@ -1215,7 +1221,8 @@ abstract class DynamicPartitionPruningSuiteBase
}
test("SPARK-32509: Unused Dynamic Pruning filter shouldn't affect " +
- "canonicalization and exchange reuse") {
+ "canonicalization and exchange reuse",
-+ IgnoreComet("TODO: Support SubqueryBroadcastExec in Comet: #1737")) {
++ IgnoreComet("TODO:
https://github.com/apache/datafusion-comet/issues/1839")) {
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key ->
"true") {
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
val df = sql(
-@@ -1729,6 +1735,8 @@ abstract class DynamicPartitionPruningV1Suite extends
DynamicPartitionPruningDat
+@@ -1423,7 +1430,8 @@ abstract class DynamicPartitionPruningSuiteBase
+ }
+ }
+
+- test("SPARK-34637: DPP side broadcast query stage is created firstly") {
++ test("SPARK-34637: DPP side broadcast query stage is created firstly",
++ IgnoreComet("TODO:
https://github.com/apache/datafusion-comet/issues/1839")) {
+ withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key ->
"true") {
+ val df = sql(
+ """ WITH v as (
+@@ -1729,6 +1737,8 @@ abstract class DynamicPartitionPruningV1Suite extends
DynamicPartitionPruningDat
case s: BatchScanExec =>
// we use f1 col for v2 tables due to schema pruning
s.output.exists(_.exists(_.argString(maxFields =
100).contains("f1")))
@@ -611,7 +659,7 @@ index 1792b4c32eb..1616e6f39bd 100644
assert(shuffleMergeJoins.size == 1)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
-index 7f062bfb899..b347ef905d2 100644
+index 7f062bfb899..0ed85486e80 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
@@ -30,7 +30,8 @@ import org.apache.spark.sql.catalyst.TableIdentifier
@@ -707,7 +755,7 @@ index 7f062bfb899..b347ef905d2 100644
// Same result between shuffled hash join and sort merge join
checkAnswer(shjDF, smjResult)
}
-@@ -1282,18 +1292,25 @@ class JoinSuite extends QueryTest with
SharedSparkSession with AdaptiveSparkPlan
+@@ -1282,18 +1292,26 @@ class JoinSuite extends QueryTest with
SharedSparkSession with AdaptiveSparkPlan
}
// Test shuffled hash join
@@ -722,6 +770,7 @@ index 7f062bfb899..b347ef905d2 100644
+ true
+ case WholeStageCodegenExec(ColumnarToRowExec(
+ InputAdapter(CometProjectExec(_, _, _, _, _: CometHashJoinExec,
_)))) => true
++ case _: CometHashJoinExec => true
}.size === 1)
checkAnswer(shjCodegenDF, Seq.empty)
@@ -735,7 +784,7 @@ index 7f062bfb899..b347ef905d2 100644
checkAnswer(shjNonCodegenDF, Seq.empty)
}
}
-@@ -1341,7 +1358,8 @@ class JoinSuite extends QueryTest with
SharedSparkSession with AdaptiveSparkPlan
+@@ -1341,7 +1359,8 @@ class JoinSuite extends QueryTest with
SharedSparkSession with AdaptiveSparkPlan
val plan = sql(getAggQuery(selectExpr,
joinType)).queryExecution.executedPlan
assert(collect(plan) { case _: BroadcastNestedLoopJoinExec => true
}.size === 1)
// Have shuffle before aggregation
@@ -745,7 +794,7 @@ index 7f062bfb899..b347ef905d2 100644
}
def getJoinQuery(selectExpr: String, joinType: String): String = {
-@@ -1370,9 +1388,12 @@ class JoinSuite extends QueryTest with
SharedSparkSession with AdaptiveSparkPlan
+@@ -1370,9 +1389,12 @@ class JoinSuite extends QueryTest with
SharedSparkSession with AdaptiveSparkPlan
}
val plan = sql(getJoinQuery(selectExpr,
joinType)).queryExecution.executedPlan
assert(collect(plan) { case _: BroadcastNestedLoopJoinExec => true
}.size === 1)
@@ -760,7 +809,7 @@ index 7f062bfb899..b347ef905d2 100644
}
// Test output ordering is not preserved
-@@ -1381,9 +1402,12 @@ class JoinSuite extends QueryTest with
SharedSparkSession with AdaptiveSparkPlan
+@@ -1381,9 +1403,12 @@ class JoinSuite extends QueryTest with
SharedSparkSession with AdaptiveSparkPlan
val selectExpr = "/*+ BROADCAST(left_t) */ k1 as k0"
val plan = sql(getJoinQuery(selectExpr,
joinType)).queryExecution.executedPlan
assert(collect(plan) { case _: BroadcastNestedLoopJoinExec => true
}.size === 1)
@@ -775,7 +824,7 @@ index 7f062bfb899..b347ef905d2 100644
}
// Test singe partition
-@@ -1393,7 +1417,8 @@ class JoinSuite extends QueryTest with
SharedSparkSession with AdaptiveSparkPlan
+@@ -1393,7 +1418,8 @@ class JoinSuite extends QueryTest with
SharedSparkSession with AdaptiveSparkPlan
|FROM range(0, 10, 1, 1) t1 FULL OUTER JOIN range(0, 10, 1, 1) t2
|""".stripMargin)
val plan = fullJoinDF.queryExecution.executedPlan
@@ -785,7 +834,7 @@ index 7f062bfb899..b347ef905d2 100644
checkAnswer(fullJoinDF, Row(100))
}
}
-@@ -1438,6 +1463,9 @@ class JoinSuite extends QueryTest with
SharedSparkSession with AdaptiveSparkPlan
+@@ -1438,6 +1464,9 @@ class JoinSuite extends QueryTest with
SharedSparkSession with AdaptiveSparkPlan
Seq(semiJoinDF, antiJoinDF).foreach { df =>
assert(collect(df.queryExecution.executedPlan) {
case j: ShuffledHashJoinExec if j.ignoreDuplicatedKey ==
ignoreDuplicatedKey => true
@@ -795,7 +844,7 @@ index 7f062bfb899..b347ef905d2 100644
}.size == 1)
}
}
-@@ -1482,14 +1510,20 @@ class JoinSuite extends QueryTest with
SharedSparkSession with AdaptiveSparkPlan
+@@ -1482,14 +1511,20 @@ class JoinSuite extends QueryTest with
SharedSparkSession with AdaptiveSparkPlan
test("SPARK-43113: Full outer join with duplicate stream-side references in
condition (SMJ)") {
def check(plan: SparkPlan): Unit = {
@@ -818,7 +867,7 @@ index 7f062bfb899..b347ef905d2 100644
}
dupStreamSideColTest("SHUFFLE_HASH", check)
}
-@@ -1605,7 +1639,8 @@ class ThreadLeakInSortMergeJoinSuite
+@@ -1605,7 +1640,8 @@ class ThreadLeakInSortMergeJoinSuite
sparkConf.set(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD, 20))
}
@@ -1280,6 +1329,28 @@ index 47679ed7865..9ffbaecb98e 100644
}.length == hashAggCount)
assert(collectWithSubqueries(plan) { case s: SortAggregateExec => s
}.length == sortAggCount)
}
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala
+index eec396b2e39..bf3f1c769d6 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala
+@@ -18,7 +18,7 @@
+ package org.apache.spark.sql.execution
+
+ import org.apache.spark.TestUtils.assertSpilled
+-import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
++import org.apache.spark.sql.{AnalysisException, IgnoreComet, QueryTest, Row}
+ import
org.apache.spark.sql.internal.SQLConf.{WINDOW_EXEC_BUFFER_IN_MEMORY_THRESHOLD,
WINDOW_EXEC_BUFFER_SPILL_THRESHOLD}
+ import org.apache.spark.sql.test.SharedSparkSession
+
+@@ -470,7 +470,7 @@ class SQLWindowFunctionSuite extends QueryTest with
SharedSparkSession {
+ Row(1, 3, null) :: Row(2, null, 4) :: Nil)
+ }
+
+- test("test with low buffer spill threshold") {
++ test("test with low buffer spill threshold", IgnoreComet("Comet does not
support spilling")) {
+ val nums = sparkContext.parallelize(1 to 10).map(x => (x, x %
2)).toDF("x", "y")
+ nums.createOrReplaceTempView("nums")
+
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
index b14f4a405f6..ab7baf434a5 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
diff --git a/dev/diffs/3.5.4.diff b/dev/diffs/3.5.4.diff
index 32d4d617e..369587175 100644
--- a/dev/diffs/3.5.4.diff
+++ b/dev/diffs/3.5.4.diff
@@ -226,7 +226,7 @@ index 9815cb816c9..95b5f9992b0 100644
test("A cached table preserves the partitioning and ordering of its cached
SparkPlan") {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
-index 5a8681aed97..db69fde723a 100644
+index 5a8681aed97..da9d25e2eb4 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.plans.logical.Expand
@@ -247,16 +247,6 @@ index 5a8681aed97..db69fde723a 100644
}
assert(exchangePlans.length == 1)
}
-@@ -1255,7 +1255,8 @@ class DataFrameAggregateSuite extends QueryTest
- }
- }
-
-- test("SPARK-32038: NormalizeFloatingNumbers should work on distinct
aggregate") {
-+ test("SPARK-32038: NormalizeFloatingNumbers should work on distinct
aggregate",
-+ IgnoreComet("TODO:
https://github.com/apache/datafusion-comet/issues/1824")) {
- withTempView("view") {
- val nan1 = java.lang.Float.intBitsToFloat(0x7f800001)
- val nan2 = java.lang.Float.intBitsToFloat(0x7fffffff)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
index 56e9520fdab..917932336df 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
@@ -342,6 +332,44 @@ index 7ee18df3756..64f01a68048 100644
withTable("tbl") {
sql(
"""
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
+index 47a311c71d5..342e71cfdd4 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
+@@ -24,8 +24,9 @@ import
org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression
+ import org.apache.spark.sql.catalyst.optimizer.TransposeWindow
+ import org.apache.spark.sql.catalyst.plans.logical.{Window => LogicalWindow}
+ import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
++import org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec
+ import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+-import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS,
Exchange, ShuffleExchangeExec}
++import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS,
Exchange, ShuffleExchangeExec, ShuffleExchangeLike}
+ import org.apache.spark.sql.execution.window.WindowExec
+ import org.apache.spark.sql.expressions.{Aggregator,
MutableAggregationBuffer, UserDefinedAggregateFunction, Window}
+ import org.apache.spark.sql.functions._
+@@ -1187,10 +1188,12 @@ class DataFrameWindowFunctionsSuite extends QueryTest
+ }
+
+ def isShuffleExecByRequirement(
+- plan: ShuffleExchangeExec,
++ plan: ShuffleExchangeLike,
+ desiredClusterColumns: Seq[String]): Boolean = plan match {
+ case ShuffleExchangeExec(op: HashPartitioning, _, ENSURE_REQUIREMENTS,
_) =>
+ partitionExpressionsColumns(op.expressions) === desiredClusterColumns
++ case CometShuffleExchangeExec(op: HashPartitioning, _, _,
ENSURE_REQUIREMENTS, _, _) =>
++ partitionExpressionsColumns(op.expressions) === desiredClusterColumns
+ case _ => false
+ }
+
+@@ -1213,7 +1216,7 @@ class DataFrameWindowFunctionsSuite extends QueryTest
+ val shuffleByRequirement = windowed.queryExecution.executedPlan.exists {
+ case w: WindowExec =>
+ w.child.exists {
+- case s: ShuffleExchangeExec => isShuffleExecByRequirement(s,
Seq("key1", "key2"))
++ case s: ShuffleExchangeLike => isShuffleExecByRequirement(s,
Seq("key1", "key2"))
+ case _ => false
+ }
+ case _ => false
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index f32b32ffc5a..447d7c6416e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -365,7 +393,7 @@ index f32b32ffc5a..447d7c6416e 100644
assert(exchanges.size == 2)
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
-index f33432ddb6f..19ce507e82b 100644
+index f33432ddb6f..0fa49fb3f0b 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
@@ -22,6 +22,7 @@ import org.scalatest.GivenWhenThen
@@ -422,7 +450,7 @@ index f33432ddb6f..19ce507e82b 100644
- test("avoid reordering broadcast join keys to match input hash
partitioning") {
+ test("avoid reordering broadcast join keys to match input hash
partitioning",
-+ IgnoreComet("TODO: Support SubqueryBroadcastExec in Comet: #242")) {
++ IgnoreComet("TODO:
https://github.com/apache/datafusion-comet/issues/1839")) {
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key ->
"false",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
withTable("large", "dimTwo", "dimThree") {
@@ -442,7 +470,7 @@ index f33432ddb6f..19ce507e82b 100644
test("SPARK-32509: Unused Dynamic Pruning filter shouldn't affect " +
- "canonicalization and exchange reuse") {
+ "canonicalization and exchange reuse",
-+ IgnoreComet("TODO: Support SubqueryBroadcastExec in Comet: #242")) {
++ IgnoreComet("TODO:
https://github.com/apache/datafusion-comet/issues/1839")) {
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key ->
"true") {
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
val df = sql(
@@ -482,7 +510,7 @@ index f33432ddb6f..19ce507e82b 100644
- test("SPARK-34637: DPP side broadcast query stage is created firstly") {
+ test("SPARK-34637: DPP side broadcast query stage is created firstly",
-+ IgnoreComet("TODO: Support SubqueryBroadcastExec in Comet: #242")) {
++ IgnoreComet("TODO:
https://github.com/apache/datafusion-comet/issues/1839")) {
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key ->
"true") {
val df = sql(
""" WITH v as (
@@ -746,7 +774,7 @@ index 7af826583bd..3c3def1eb67 100644
assert(shuffleMergeJoins.size == 1)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
-index 4d256154c85..43f0bebb00c 100644
+index 4d256154c85..66a5473852d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
@@ -31,7 +31,8 @@ import
org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
@@ -872,7 +900,7 @@ index 4d256154c85..43f0bebb00c 100644
}.size === 1)
// Same result between shuffled hash join and sort merge join
checkAnswer(shjDF, smjResult)
-@@ -1432,13 +1446,19 @@ class JoinSuite extends QueryTest with
SharedSparkSession with AdaptiveSparkPlan
+@@ -1432,13 +1446,20 @@ class JoinSuite extends QueryTest with
SharedSparkSession with AdaptiveSparkPlan
assert(shjCodegenDF.queryExecution.executedPlan.collect {
case WholeStageCodegenExec(_ : ShuffledHashJoinExec) => true
case WholeStageCodegenExec(ProjectExec(_, _ :
ShuffledHashJoinExec)) => true
@@ -880,6 +908,7 @@ index 4d256154c85..43f0bebb00c 100644
+ true
+ case WholeStageCodegenExec(ColumnarToRowExec(
+ InputAdapter(CometProjectExec(_, _, _, _, _: CometHashJoinExec,
_)))) => true
++ case _: CometHashJoinExec => true
}.size === 1)
checkAnswer(shjCodegenDF, Seq.empty)
@@ -893,7 +922,7 @@ index 4d256154c85..43f0bebb00c 100644
checkAnswer(shjNonCodegenDF, Seq.empty)
}
}
-@@ -1486,7 +1506,8 @@ class JoinSuite extends QueryTest with
SharedSparkSession with AdaptiveSparkPlan
+@@ -1486,7 +1507,8 @@ class JoinSuite extends QueryTest with
SharedSparkSession with AdaptiveSparkPlan
val plan = sql(getAggQuery(selectExpr,
joinType)).queryExecution.executedPlan
assert(collect(plan) { case _: BroadcastNestedLoopJoinExec => true
}.size === 1)
// Have shuffle before aggregation
@@ -903,7 +932,7 @@ index 4d256154c85..43f0bebb00c 100644
}
def getJoinQuery(selectExpr: String, joinType: String): String = {
-@@ -1515,9 +1536,12 @@ class JoinSuite extends QueryTest with
SharedSparkSession with AdaptiveSparkPlan
+@@ -1515,9 +1537,12 @@ class JoinSuite extends QueryTest with
SharedSparkSession with AdaptiveSparkPlan
}
val plan = sql(getJoinQuery(selectExpr,
joinType)).queryExecution.executedPlan
assert(collect(plan) { case _: BroadcastNestedLoopJoinExec => true
}.size === 1)
@@ -918,7 +947,7 @@ index 4d256154c85..43f0bebb00c 100644
}
// Test output ordering is not preserved
-@@ -1526,9 +1550,12 @@ class JoinSuite extends QueryTest with
SharedSparkSession with AdaptiveSparkPlan
+@@ -1526,9 +1551,12 @@ class JoinSuite extends QueryTest with
SharedSparkSession with AdaptiveSparkPlan
val selectExpr = "/*+ BROADCAST(left_t) */ k1 as k0"
val plan = sql(getJoinQuery(selectExpr,
joinType)).queryExecution.executedPlan
assert(collect(plan) { case _: BroadcastNestedLoopJoinExec => true
}.size === 1)
@@ -933,7 +962,7 @@ index 4d256154c85..43f0bebb00c 100644
}
// Test singe partition
-@@ -1538,7 +1565,8 @@ class JoinSuite extends QueryTest with
SharedSparkSession with AdaptiveSparkPlan
+@@ -1538,7 +1566,8 @@ class JoinSuite extends QueryTest with
SharedSparkSession with AdaptiveSparkPlan
|FROM range(0, 10, 1, 1) t1 FULL OUTER JOIN range(0, 10, 1, 1) t2
|""".stripMargin)
val plan = fullJoinDF.queryExecution.executedPlan
@@ -943,7 +972,7 @@ index 4d256154c85..43f0bebb00c 100644
checkAnswer(fullJoinDF, Row(100))
}
}
-@@ -1583,6 +1611,9 @@ class JoinSuite extends QueryTest with
SharedSparkSession with AdaptiveSparkPlan
+@@ -1583,6 +1612,9 @@ class JoinSuite extends QueryTest with
SharedSparkSession with AdaptiveSparkPlan
Seq(semiJoinDF, antiJoinDF).foreach { df =>
assert(collect(df.queryExecution.executedPlan) {
case j: ShuffledHashJoinExec if j.ignoreDuplicatedKey ==
ignoreDuplicatedKey => true
@@ -953,7 +982,7 @@ index 4d256154c85..43f0bebb00c 100644
}.size == 1)
}
}
-@@ -1627,14 +1658,20 @@ class JoinSuite extends QueryTest with
SharedSparkSession with AdaptiveSparkPlan
+@@ -1627,14 +1659,20 @@ class JoinSuite extends QueryTest with
SharedSparkSession with AdaptiveSparkPlan
test("SPARK-43113: Full outer join with duplicate stream-side references in
condition (SMJ)") {
def check(plan: SparkPlan): Unit = {
@@ -976,6 +1005,16 @@ index 4d256154c85..43f0bebb00c 100644
}
dupStreamSideColTest("SHUFFLE_HASH", check)
}
+@@ -1770,7 +1808,8 @@ class ThreadLeakInSortMergeJoinSuite
+ sparkConf.set(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD, 20))
+ }
+
+- test("SPARK-47146: thread leak when doing SortMergeJoin (with spill)") {
++ test("SPARK-47146: thread leak when doing SortMergeJoin (with spill)",
++ IgnoreComet("Comet does not support spilling")) {
+
+ withSQLConf(
+ SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1") {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/PlanStabilitySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/PlanStabilitySuite.scala
index c26757c9cff..d55775f09d7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/PlanStabilitySuite.scala
@@ -1451,6 +1490,28 @@ index 47679ed7865..9ffbaecb98e 100644
}.length == hashAggCount)
assert(collectWithSubqueries(plan) { case s: SortAggregateExec => s
}.length == sortAggCount)
}
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala
+index eec396b2e39..bf3f1c769d6 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala
+@@ -18,7 +18,7 @@
+ package org.apache.spark.sql.execution
+
+ import org.apache.spark.TestUtils.assertSpilled
+-import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
++import org.apache.spark.sql.{AnalysisException, IgnoreComet, QueryTest, Row}
+ import
org.apache.spark.sql.internal.SQLConf.{WINDOW_EXEC_BUFFER_IN_MEMORY_THRESHOLD,
WINDOW_EXEC_BUFFER_SPILL_THRESHOLD}
+ import org.apache.spark.sql.test.SharedSparkSession
+
+@@ -470,7 +470,7 @@ class SQLWindowFunctionSuite extends QueryTest with
SharedSparkSession {
+ Row(1, 3, null) :: Row(2, null, 4) :: Nil)
+ }
+
+- test("test with low buffer spill threshold") {
++ test("test with low buffer spill threshold", IgnoreComet("Comet does not
support spilling")) {
+ val nums = sparkContext.parallelize(1 to 10).map(x => (x, x %
2)).toDF("x", "y")
+ nums.createOrReplaceTempView("nums")
+
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
index b14f4a405f6..ab7baf434a5 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
diff --git a/dev/diffs/3.5.5.diff b/dev/diffs/3.5.5.diff
index 9ca531087..75928a077 100644
--- a/dev/diffs/3.5.5.diff
+++ b/dev/diffs/3.5.5.diff
@@ -226,7 +226,7 @@ index 9815cb816c9..95b5f9992b0 100644
test("A cached table preserves the partitioning and ordering of its cached
SparkPlan") {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
-index 5a8681aed97..db69fde723a 100644
+index 5a8681aed97..da9d25e2eb4 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.plans.logical.Expand
@@ -247,16 +247,6 @@ index 5a8681aed97..db69fde723a 100644
}
assert(exchangePlans.length == 1)
}
-@@ -1255,7 +1255,8 @@ class DataFrameAggregateSuite extends QueryTest
- }
- }
-
-- test("SPARK-32038: NormalizeFloatingNumbers should work on distinct
aggregate") {
-+ test("SPARK-32038: NormalizeFloatingNumbers should work on distinct
aggregate",
-+ IgnoreComet("TODO:
https://github.com/apache/datafusion-comet/issues/1824")) {
- withTempView("view") {
- val nan1 = java.lang.Float.intBitsToFloat(0x7f800001)
- val nan2 = java.lang.Float.intBitsToFloat(0x7fffffff)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
index 56e9520fdab..917932336df 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
@@ -342,6 +332,44 @@ index 7ee18df3756..64f01a68048 100644
withTable("tbl") {
sql(
"""
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
+index 47a311c71d5..342e71cfdd4 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
+@@ -24,8 +24,9 @@ import
org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression
+ import org.apache.spark.sql.catalyst.optimizer.TransposeWindow
+ import org.apache.spark.sql.catalyst.plans.logical.{Window => LogicalWindow}
+ import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
++import org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec
+ import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+-import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS,
Exchange, ShuffleExchangeExec}
++import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS,
Exchange, ShuffleExchangeExec, ShuffleExchangeLike}
+ import org.apache.spark.sql.execution.window.WindowExec
+ import org.apache.spark.sql.expressions.{Aggregator,
MutableAggregationBuffer, UserDefinedAggregateFunction, Window}
+ import org.apache.spark.sql.functions._
+@@ -1187,10 +1188,12 @@ class DataFrameWindowFunctionsSuite extends QueryTest
+ }
+
+ def isShuffleExecByRequirement(
+- plan: ShuffleExchangeExec,
++ plan: ShuffleExchangeLike,
+ desiredClusterColumns: Seq[String]): Boolean = plan match {
+ case ShuffleExchangeExec(op: HashPartitioning, _, ENSURE_REQUIREMENTS,
_) =>
+ partitionExpressionsColumns(op.expressions) === desiredClusterColumns
++ case CometShuffleExchangeExec(op: HashPartitioning, _, _,
ENSURE_REQUIREMENTS, _, _) =>
++ partitionExpressionsColumns(op.expressions) === desiredClusterColumns
+ case _ => false
+ }
+
+@@ -1213,7 +1216,7 @@ class DataFrameWindowFunctionsSuite extends QueryTest
+ val shuffleByRequirement = windowed.queryExecution.executedPlan.exists {
+ case w: WindowExec =>
+ w.child.exists {
+- case s: ShuffleExchangeExec => isShuffleExecByRequirement(s,
Seq("key1", "key2"))
++ case s: ShuffleExchangeLike => isShuffleExecByRequirement(s,
Seq("key1", "key2"))
+ case _ => false
+ }
+ case _ => false
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index f32b32ffc5a..447d7c6416e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -365,7 +393,7 @@ index f32b32ffc5a..447d7c6416e 100644
assert(exchanges.size == 2)
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
-index f33432ddb6f..fe9f74ff8f1 100644
+index f33432ddb6f..0e1499a24ca 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
@@ -22,6 +22,7 @@ import org.scalatest.GivenWhenThen
@@ -386,7 +414,37 @@ index f33432ddb6f..fe9f74ff8f1 100644
case _ => Nil
}
}
-@@ -1729,6 +1733,8 @@ abstract class DynamicPartitionPruningV1Suite extends
DynamicPartitionPruningDat
+@@ -1027,7 +1031,8 @@ abstract class DynamicPartitionPruningSuiteBase
+ }
+ }
+
+- test("avoid reordering broadcast join keys to match input hash
partitioning") {
++ test("avoid reordering broadcast join keys to match input hash
partitioning",
++ IgnoreComet("TODO:
https://github.com/apache/datafusion-comet/issues/1839")) {
+ withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key ->
"false",
+ SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
+ withTable("large", "dimTwo", "dimThree") {
+@@ -1215,7 +1220,8 @@ abstract class DynamicPartitionPruningSuiteBase
+ }
+
+ test("SPARK-32509: Unused Dynamic Pruning filter shouldn't affect " +
+- "canonicalization and exchange reuse") {
++ "canonicalization and exchange reuse",
++ IgnoreComet("TODO:
https://github.com/apache/datafusion-comet/issues/1839")) {
+ withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key ->
"true") {
+ withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
+ val df = sql(
+@@ -1423,7 +1429,8 @@ abstract class DynamicPartitionPruningSuiteBase
+ }
+ }
+
+- test("SPARK-34637: DPP side broadcast query stage is created firstly") {
++ test("SPARK-34637: DPP side broadcast query stage is created firstly",
++ IgnoreComet("TODO:
https://github.com/apache/datafusion-comet/issues/1839")) {
+ withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key ->
"true") {
+ val df = sql(
+ """ WITH v as (
+@@ -1729,6 +1736,8 @@ abstract class DynamicPartitionPruningV1Suite extends
DynamicPartitionPruningDat
case s: BatchScanExec =>
// we use f1 col for v2 tables due to schema pruning
s.output.exists(_.exists(_.argString(maxFields =
100).contains("f1")))
@@ -563,7 +621,7 @@ index 7af826583bd..3c3def1eb67 100644
assert(shuffleMergeJoins.size == 1)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
-index 4d256154c85..43f0bebb00c 100644
+index 4d256154c85..66a5473852d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
@@ -31,7 +31,8 @@ import
org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
@@ -689,7 +747,7 @@ index 4d256154c85..43f0bebb00c 100644
}.size === 1)
// Same result between shuffled hash join and sort merge join
checkAnswer(shjDF, smjResult)
-@@ -1432,13 +1446,19 @@ class JoinSuite extends QueryTest with
SharedSparkSession with AdaptiveSparkPlan
+@@ -1432,13 +1446,20 @@ class JoinSuite extends QueryTest with
SharedSparkSession with AdaptiveSparkPlan
assert(shjCodegenDF.queryExecution.executedPlan.collect {
case WholeStageCodegenExec(_ : ShuffledHashJoinExec) => true
case WholeStageCodegenExec(ProjectExec(_, _ :
ShuffledHashJoinExec)) => true
@@ -697,6 +755,7 @@ index 4d256154c85..43f0bebb00c 100644
+ true
+ case WholeStageCodegenExec(ColumnarToRowExec(
+ InputAdapter(CometProjectExec(_, _, _, _, _: CometHashJoinExec,
_)))) => true
++ case _: CometHashJoinExec => true
}.size === 1)
checkAnswer(shjCodegenDF, Seq.empty)
@@ -710,7 +769,7 @@ index 4d256154c85..43f0bebb00c 100644
checkAnswer(shjNonCodegenDF, Seq.empty)
}
}
-@@ -1486,7 +1506,8 @@ class JoinSuite extends QueryTest with
SharedSparkSession with AdaptiveSparkPlan
+@@ -1486,7 +1507,8 @@ class JoinSuite extends QueryTest with
SharedSparkSession with AdaptiveSparkPlan
val plan = sql(getAggQuery(selectExpr,
joinType)).queryExecution.executedPlan
assert(collect(plan) { case _: BroadcastNestedLoopJoinExec => true
}.size === 1)
// Have shuffle before aggregation
@@ -720,7 +779,7 @@ index 4d256154c85..43f0bebb00c 100644
}
def getJoinQuery(selectExpr: String, joinType: String): String = {
-@@ -1515,9 +1536,12 @@ class JoinSuite extends QueryTest with
SharedSparkSession with AdaptiveSparkPlan
+@@ -1515,9 +1537,12 @@ class JoinSuite extends QueryTest with
SharedSparkSession with AdaptiveSparkPlan
}
val plan = sql(getJoinQuery(selectExpr,
joinType)).queryExecution.executedPlan
assert(collect(plan) { case _: BroadcastNestedLoopJoinExec => true
}.size === 1)
@@ -735,7 +794,7 @@ index 4d256154c85..43f0bebb00c 100644
}
// Test output ordering is not preserved
-@@ -1526,9 +1550,12 @@ class JoinSuite extends QueryTest with
SharedSparkSession with AdaptiveSparkPlan
+@@ -1526,9 +1551,12 @@ class JoinSuite extends QueryTest with
SharedSparkSession with AdaptiveSparkPlan
val selectExpr = "/*+ BROADCAST(left_t) */ k1 as k0"
val plan = sql(getJoinQuery(selectExpr,
joinType)).queryExecution.executedPlan
assert(collect(plan) { case _: BroadcastNestedLoopJoinExec => true
}.size === 1)
@@ -750,7 +809,7 @@ index 4d256154c85..43f0bebb00c 100644
}
// Test singe partition
-@@ -1538,7 +1565,8 @@ class JoinSuite extends QueryTest with
SharedSparkSession with AdaptiveSparkPlan
+@@ -1538,7 +1566,8 @@ class JoinSuite extends QueryTest with
SharedSparkSession with AdaptiveSparkPlan
|FROM range(0, 10, 1, 1) t1 FULL OUTER JOIN range(0, 10, 1, 1) t2
|""".stripMargin)
val plan = fullJoinDF.queryExecution.executedPlan
@@ -760,7 +819,7 @@ index 4d256154c85..43f0bebb00c 100644
checkAnswer(fullJoinDF, Row(100))
}
}
-@@ -1583,6 +1611,9 @@ class JoinSuite extends QueryTest with
SharedSparkSession with AdaptiveSparkPlan
+@@ -1583,6 +1612,9 @@ class JoinSuite extends QueryTest with
SharedSparkSession with AdaptiveSparkPlan
Seq(semiJoinDF, antiJoinDF).foreach { df =>
assert(collect(df.queryExecution.executedPlan) {
case j: ShuffledHashJoinExec if j.ignoreDuplicatedKey ==
ignoreDuplicatedKey => true
@@ -770,7 +829,7 @@ index 4d256154c85..43f0bebb00c 100644
}.size == 1)
}
}
-@@ -1627,14 +1658,20 @@ class JoinSuite extends QueryTest with
SharedSparkSession with AdaptiveSparkPlan
+@@ -1627,14 +1659,20 @@ class JoinSuite extends QueryTest with
SharedSparkSession with AdaptiveSparkPlan
test("SPARK-43113: Full outer join with duplicate stream-side references in
condition (SMJ)") {
def check(plan: SparkPlan): Unit = {
@@ -793,6 +852,16 @@ index 4d256154c85..43f0bebb00c 100644
}
dupStreamSideColTest("SHUFFLE_HASH", check)
}
+@@ -1770,7 +1808,8 @@ class ThreadLeakInSortMergeJoinSuite
+ sparkConf.set(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD, 20))
+ }
+
+- test("SPARK-47146: thread leak when doing SortMergeJoin (with spill)") {
++ test("SPARK-47146: thread leak when doing SortMergeJoin (with spill)",
++ IgnoreComet("Comet does not support spilling")) {
+
+ withSQLConf(
+ SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1") {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/PlanStabilitySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/PlanStabilitySuite.scala
index c26757c9cff..d55775f09d7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/PlanStabilitySuite.scala
@@ -1256,6 +1325,28 @@ index 47679ed7865..9ffbaecb98e 100644
}.length == hashAggCount)
assert(collectWithSubqueries(plan) { case s: SortAggregateExec => s
}.length == sortAggCount)
}
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala
+index eec396b2e39..bf3f1c769d6 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala
+@@ -18,7 +18,7 @@
+ package org.apache.spark.sql.execution
+
+ import org.apache.spark.TestUtils.assertSpilled
+-import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
++import org.apache.spark.sql.{AnalysisException, IgnoreComet, QueryTest, Row}
+ import
org.apache.spark.sql.internal.SQLConf.{WINDOW_EXEC_BUFFER_IN_MEMORY_THRESHOLD,
WINDOW_EXEC_BUFFER_SPILL_THRESHOLD}
+ import org.apache.spark.sql.test.SharedSparkSession
+
+@@ -470,7 +470,7 @@ class SQLWindowFunctionSuite extends QueryTest with
SharedSparkSession {
+ Row(1, 3, null) :: Row(2, null, 4) :: Nil)
+ }
+
+- test("test with low buffer spill threshold") {
++ test("test with low buffer spill threshold", IgnoreComet("Comet does not
support spilling")) {
+ val nums = sparkContext.parallelize(1 to 10).map(x => (x, x %
2)).toDF("x", "y")
+ nums.createOrReplaceTempView("nums")
+
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
index b14f4a405f6..ab7baf434a5 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
diff --git a/dev/diffs/3.5.6.diff b/dev/diffs/3.5.6.diff
index 42d5dad00..e3ba0a35c 100644
--- a/dev/diffs/3.5.6.diff
+++ b/dev/diffs/3.5.6.diff
@@ -226,7 +226,7 @@ index 9815cb816c9..95b5f9992b0 100644
test("A cached table preserves the partitioning and ordering of its cached
SparkPlan") {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
-index 5a8681aed97..db69fde723a 100644
+index 5a8681aed97..da9d25e2eb4 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.plans.logical.Expand
@@ -247,16 +247,6 @@ index 5a8681aed97..db69fde723a 100644
}
assert(exchangePlans.length == 1)
}
-@@ -1255,7 +1255,8 @@ class DataFrameAggregateSuite extends QueryTest
- }
- }
-
-- test("SPARK-32038: NormalizeFloatingNumbers should work on distinct
aggregate") {
-+ test("SPARK-32038: NormalizeFloatingNumbers should work on distinct
aggregate",
-+ IgnoreComet("TODO:
https://github.com/apache/datafusion-comet/issues/1824")) {
- withTempView("view") {
- val nan1 = java.lang.Float.intBitsToFloat(0x7f800001)
- val nan2 = java.lang.Float.intBitsToFloat(0x7fffffff)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
index 56e9520fdab..917932336df 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
@@ -342,6 +332,44 @@ index 7ee18df3756..64f01a68048 100644
withTable("tbl") {
sql(
"""
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
+index 47a311c71d5..342e71cfdd4 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
+@@ -24,8 +24,9 @@ import
org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression
+ import org.apache.spark.sql.catalyst.optimizer.TransposeWindow
+ import org.apache.spark.sql.catalyst.plans.logical.{Window => LogicalWindow}
+ import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
++import org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec
+ import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+-import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS,
Exchange, ShuffleExchangeExec}
++import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS,
Exchange, ShuffleExchangeExec, ShuffleExchangeLike}
+ import org.apache.spark.sql.execution.window.WindowExec
+ import org.apache.spark.sql.expressions.{Aggregator,
MutableAggregationBuffer, UserDefinedAggregateFunction, Window}
+ import org.apache.spark.sql.functions._
+@@ -1187,10 +1188,12 @@ class DataFrameWindowFunctionsSuite extends QueryTest
+ }
+
+ def isShuffleExecByRequirement(
+- plan: ShuffleExchangeExec,
++ plan: ShuffleExchangeLike,
+ desiredClusterColumns: Seq[String]): Boolean = plan match {
+ case ShuffleExchangeExec(op: HashPartitioning, _, ENSURE_REQUIREMENTS,
_) =>
+ partitionExpressionsColumns(op.expressions) === desiredClusterColumns
++ case CometShuffleExchangeExec(op: HashPartitioning, _, _,
ENSURE_REQUIREMENTS, _, _) =>
++ partitionExpressionsColumns(op.expressions) === desiredClusterColumns
+ case _ => false
+ }
+
+@@ -1213,7 +1216,7 @@ class DataFrameWindowFunctionsSuite extends QueryTest
+ val shuffleByRequirement = windowed.queryExecution.executedPlan.exists {
+ case w: WindowExec =>
+ w.child.exists {
+- case s: ShuffleExchangeExec => isShuffleExecByRequirement(s,
Seq("key1", "key2"))
++ case s: ShuffleExchangeLike => isShuffleExecByRequirement(s,
Seq("key1", "key2"))
+ case _ => false
+ }
+ case _ => false
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index f32b32ffc5a..447d7c6416e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -365,7 +393,7 @@ index f32b32ffc5a..447d7c6416e 100644
assert(exchanges.size == 2)
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
-index f33432ddb6f..fe9f74ff8f1 100644
+index f33432ddb6f..0e1499a24ca 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
@@ -22,6 +22,7 @@ import org.scalatest.GivenWhenThen
@@ -386,7 +414,37 @@ index f33432ddb6f..fe9f74ff8f1 100644
case _ => Nil
}
}
-@@ -1729,6 +1733,8 @@ abstract class DynamicPartitionPruningV1Suite extends
DynamicPartitionPruningDat
+@@ -1027,7 +1031,8 @@ abstract class DynamicPartitionPruningSuiteBase
+ }
+ }
+
+- test("avoid reordering broadcast join keys to match input hash
partitioning") {
++ test("avoid reordering broadcast join keys to match input hash
partitioning",
++ IgnoreComet("TODO:
https://github.com/apache/datafusion-comet/issues/1839")) {
+ withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key ->
"false",
+ SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
+ withTable("large", "dimTwo", "dimThree") {
+@@ -1215,7 +1220,8 @@ abstract class DynamicPartitionPruningSuiteBase
+ }
+
+ test("SPARK-32509: Unused Dynamic Pruning filter shouldn't affect " +
+- "canonicalization and exchange reuse") {
++ "canonicalization and exchange reuse",
++ IgnoreComet("TODO:
https://github.com/apache/datafusion-comet/issues/1839")) {
+ withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key ->
"true") {
+ withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
+ val df = sql(
+@@ -1423,7 +1429,8 @@ abstract class DynamicPartitionPruningSuiteBase
+ }
+ }
+
+- test("SPARK-34637: DPP side broadcast query stage is created firstly") {
++ test("SPARK-34637: DPP side broadcast query stage is created firstly",
++ IgnoreComet("TODO:
https://github.com/apache/datafusion-comet/issues/1839")) {
+ withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key ->
"true") {
+ val df = sql(
+ """ WITH v as (
+@@ -1729,6 +1736,8 @@ abstract class DynamicPartitionPruningV1Suite extends
DynamicPartitionPruningDat
case s: BatchScanExec =>
// we use f1 col for v2 tables due to schema pruning
s.output.exists(_.exists(_.argString(maxFields =
100).contains("f1")))
@@ -563,7 +621,7 @@ index 7af826583bd..3c3def1eb67 100644
assert(shuffleMergeJoins.size == 1)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
-index 4d256154c85..43f0bebb00c 100644
+index 4d256154c85..66a5473852d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
@@ -31,7 +31,8 @@ import
org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
@@ -689,7 +747,7 @@ index 4d256154c85..43f0bebb00c 100644
}.size === 1)
// Same result between shuffled hash join and sort merge join
checkAnswer(shjDF, smjResult)
-@@ -1432,13 +1446,19 @@ class JoinSuite extends QueryTest with
SharedSparkSession with AdaptiveSparkPlan
+@@ -1432,13 +1446,20 @@ class JoinSuite extends QueryTest with
SharedSparkSession with AdaptiveSparkPlan
assert(shjCodegenDF.queryExecution.executedPlan.collect {
case WholeStageCodegenExec(_ : ShuffledHashJoinExec) => true
case WholeStageCodegenExec(ProjectExec(_, _ :
ShuffledHashJoinExec)) => true
@@ -697,6 +755,7 @@ index 4d256154c85..43f0bebb00c 100644
+ true
+ case WholeStageCodegenExec(ColumnarToRowExec(
+ InputAdapter(CometProjectExec(_, _, _, _, _: CometHashJoinExec,
_)))) => true
++ case _: CometHashJoinExec => true
}.size === 1)
checkAnswer(shjCodegenDF, Seq.empty)
@@ -710,7 +769,7 @@ index 4d256154c85..43f0bebb00c 100644
checkAnswer(shjNonCodegenDF, Seq.empty)
}
}
-@@ -1486,7 +1506,8 @@ class JoinSuite extends QueryTest with
SharedSparkSession with AdaptiveSparkPlan
+@@ -1486,7 +1507,8 @@ class JoinSuite extends QueryTest with
SharedSparkSession with AdaptiveSparkPlan
val plan = sql(getAggQuery(selectExpr,
joinType)).queryExecution.executedPlan
assert(collect(plan) { case _: BroadcastNestedLoopJoinExec => true
}.size === 1)
// Have shuffle before aggregation
@@ -720,7 +779,7 @@ index 4d256154c85..43f0bebb00c 100644
}
def getJoinQuery(selectExpr: String, joinType: String): String = {
-@@ -1515,9 +1536,12 @@ class JoinSuite extends QueryTest with
SharedSparkSession with AdaptiveSparkPlan
+@@ -1515,9 +1537,12 @@ class JoinSuite extends QueryTest with
SharedSparkSession with AdaptiveSparkPlan
}
val plan = sql(getJoinQuery(selectExpr,
joinType)).queryExecution.executedPlan
assert(collect(plan) { case _: BroadcastNestedLoopJoinExec => true
}.size === 1)
@@ -735,7 +794,7 @@ index 4d256154c85..43f0bebb00c 100644
}
// Test output ordering is not preserved
-@@ -1526,9 +1550,12 @@ class JoinSuite extends QueryTest with
SharedSparkSession with AdaptiveSparkPlan
+@@ -1526,9 +1551,12 @@ class JoinSuite extends QueryTest with
SharedSparkSession with AdaptiveSparkPlan
val selectExpr = "/*+ BROADCAST(left_t) */ k1 as k0"
val plan = sql(getJoinQuery(selectExpr,
joinType)).queryExecution.executedPlan
assert(collect(plan) { case _: BroadcastNestedLoopJoinExec => true
}.size === 1)
@@ -750,7 +809,7 @@ index 4d256154c85..43f0bebb00c 100644
}
// Test singe partition
-@@ -1538,7 +1565,8 @@ class JoinSuite extends QueryTest with
SharedSparkSession with AdaptiveSparkPlan
+@@ -1538,7 +1566,8 @@ class JoinSuite extends QueryTest with
SharedSparkSession with AdaptiveSparkPlan
|FROM range(0, 10, 1, 1) t1 FULL OUTER JOIN range(0, 10, 1, 1) t2
|""".stripMargin)
val plan = fullJoinDF.queryExecution.executedPlan
@@ -760,7 +819,7 @@ index 4d256154c85..43f0bebb00c 100644
checkAnswer(fullJoinDF, Row(100))
}
}
-@@ -1583,6 +1611,9 @@ class JoinSuite extends QueryTest with
SharedSparkSession with AdaptiveSparkPlan
+@@ -1583,6 +1612,9 @@ class JoinSuite extends QueryTest with
SharedSparkSession with AdaptiveSparkPlan
Seq(semiJoinDF, antiJoinDF).foreach { df =>
assert(collect(df.queryExecution.executedPlan) {
case j: ShuffledHashJoinExec if j.ignoreDuplicatedKey ==
ignoreDuplicatedKey => true
@@ -770,7 +829,7 @@ index 4d256154c85..43f0bebb00c 100644
}.size == 1)
}
}
-@@ -1627,14 +1658,20 @@ class JoinSuite extends QueryTest with
SharedSparkSession with AdaptiveSparkPlan
+@@ -1627,14 +1659,20 @@ class JoinSuite extends QueryTest with
SharedSparkSession with AdaptiveSparkPlan
test("SPARK-43113: Full outer join with duplicate stream-side references in
condition (SMJ)") {
def check(plan: SparkPlan): Unit = {
@@ -793,6 +852,16 @@ index 4d256154c85..43f0bebb00c 100644
}
dupStreamSideColTest("SHUFFLE_HASH", check)
}
+@@ -1770,7 +1808,8 @@ class ThreadLeakInSortMergeJoinSuite
+ sparkConf.set(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD, 20))
+ }
+
+- test("SPARK-47146: thread leak when doing SortMergeJoin (with spill)") {
++ test("SPARK-47146: thread leak when doing SortMergeJoin (with spill)",
++ IgnoreComet("Comet does not support spilling")) {
+
+ withSQLConf(
+ SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1") {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/PlanStabilitySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/PlanStabilitySuite.scala
index c26757c9cff..d55775f09d7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/PlanStabilitySuite.scala
@@ -1256,6 +1325,28 @@ index 47679ed7865..9ffbaecb98e 100644
}.length == hashAggCount)
assert(collectWithSubqueries(plan) { case s: SortAggregateExec => s
}.length == sortAggCount)
}
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala
+index eec396b2e39..bf3f1c769d6 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala
+@@ -18,7 +18,7 @@
+ package org.apache.spark.sql.execution
+
+ import org.apache.spark.TestUtils.assertSpilled
+-import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
++import org.apache.spark.sql.{AnalysisException, IgnoreComet, QueryTest, Row}
+ import
org.apache.spark.sql.internal.SQLConf.{WINDOW_EXEC_BUFFER_IN_MEMORY_THRESHOLD,
WINDOW_EXEC_BUFFER_SPILL_THRESHOLD}
+ import org.apache.spark.sql.test.SharedSparkSession
+
+@@ -470,7 +470,7 @@ class SQLWindowFunctionSuite extends QueryTest with
SharedSparkSession {
+ Row(1, 3, null) :: Row(2, null, 4) :: Nil)
+ }
+
+- test("test with low buffer spill threshold") {
++ test("test with low buffer spill threshold", IgnoreComet("Comet does not
support spilling")) {
+ val nums = sparkContext.parallelize(1 to 10).map(x => (x, x %
2)).toDF("x", "y")
+ nums.createOrReplaceTempView("nums")
+
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
index b14f4a405f6..ab7baf434a5 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
diff --git a/dev/diffs/4.0.0-preview1.diff b/dev/diffs/4.0.0-preview1.diff
index 2fec4297d..a59c85c21 100644
--- a/dev/diffs/4.0.0-preview1.diff
+++ b/dev/diffs/4.0.0-preview1.diff
@@ -268,7 +268,7 @@ index d023fb82185..0f4f03bda6c 100644
withTempView("t0", "t1", "t2") {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
-index 620ee430cab..f5df9218fc1 100644
+index 620ee430cab..9d383a4bff9 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
@@ -28,7 +28,7 @@ import
org.apache.spark.sql.catalyst.util.AUTO_GENERATED_ALIAS
@@ -289,16 +289,6 @@ index 620ee430cab..f5df9218fc1 100644
}
assert(exchangePlans.length == 1)
}
-@@ -1275,7 +1275,8 @@ class DataFrameAggregateSuite extends QueryTest
- }
- }
-
-- test("SPARK-32038: NormalizeFloatingNumbers should work on distinct
aggregate") {
-+ test("SPARK-32038: NormalizeFloatingNumbers should work on distinct
aggregate",
-+ IgnoreComet("TODO:
https://github.com/apache/datafusion-comet/issues/1824")) {
- withTempView("view") {
- val nan1 = java.lang.Float.intBitsToFloat(0x7f800001)
- val nan2 = java.lang.Float.intBitsToFloat(0x7fffffff)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
index f6fd6b501d7..11870c85d82 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
@@ -374,6 +364,44 @@ index 760ee802608..b77133ffd37 100644
}
assert(exchanges.size == 2)
}
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
+index e3aff9b36ae..06196517935 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
+@@ -24,8 +24,9 @@ import
org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression
+ import org.apache.spark.sql.catalyst.optimizer.TransposeWindow
+ import org.apache.spark.sql.catalyst.plans.logical.{Window => LogicalWindow}
+ import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
++import org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec
+ import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+-import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS,
Exchange, ShuffleExchangeExec}
++import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS,
Exchange, ShuffleExchangeExec, ShuffleExchangeLike}
+ import org.apache.spark.sql.execution.window.WindowExec
+ import org.apache.spark.sql.expressions.{Aggregator,
MutableAggregationBuffer, UserDefinedAggregateFunction, Window}
+ import org.apache.spark.sql.functions._
+@@ -1142,10 +1143,12 @@ class DataFrameWindowFunctionsSuite extends QueryTest
+ }
+
+ def isShuffleExecByRequirement(
+- plan: ShuffleExchangeExec,
++ plan: ShuffleExchangeLike,
+ desiredClusterColumns: Seq[String]): Boolean = plan match {
+ case ShuffleExchangeExec(op: HashPartitioning, _, ENSURE_REQUIREMENTS,
_) =>
+ partitionExpressionsColumns(op.expressions) === desiredClusterColumns
++ case CometShuffleExchangeExec(op: HashPartitioning, _, _,
ENSURE_REQUIREMENTS, _, _) =>
++ partitionExpressionsColumns(op.expressions) === desiredClusterColumns
+ case _ => false
+ }
+
+@@ -1168,7 +1171,7 @@ class DataFrameWindowFunctionsSuite extends QueryTest
+ val shuffleByRequirement = windowed.queryExecution.executedPlan.exists {
+ case w: WindowExec =>
+ w.child.exists {
+- case s: ShuffleExchangeExec => isShuffleExecByRequirement(s,
Seq("key1", "key2"))
++ case s: ShuffleExchangeLike => isShuffleExecByRequirement(s,
Seq("key1", "key2"))
+ case _ => false
+ }
+ case _ => false
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index 16a493b5290..3f0b70e2d59 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -397,7 +425,7 @@ index 16a493b5290..3f0b70e2d59 100644
assert(exchanges.size == 2)
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
-index 2c24cc7d570..3e6a8632fa6 100644
+index 2c24cc7d570..21d36ebc6f5 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
@@ -22,6 +22,7 @@ import org.scalatest.GivenWhenThen
@@ -428,17 +456,37 @@ index 2c24cc7d570..3e6a8632fa6 100644
Given("disable broadcast pruning and disable subquery duplication")
withSQLConf(
SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true",
-@@ -1215,7 +1220,8 @@ abstract class DynamicPartitionPruningSuiteBase
+@@ -1027,7 +1032,8 @@ abstract class DynamicPartitionPruningSuiteBase
+ }
+ }
+
+- test("avoid reordering broadcast join keys to match input hash
partitioning") {
++ test("avoid reordering broadcast join keys to match input hash
partitioning",
++ IgnoreComet("TODO:
https://github.com/apache/datafusion-comet/issues/1839")) {
+ withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key ->
"false",
+ SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
+ withTable("large", "dimTwo", "dimThree") {
+@@ -1215,7 +1221,8 @@ abstract class DynamicPartitionPruningSuiteBase
}
test("SPARK-32509: Unused Dynamic Pruning filter shouldn't affect " +
- "canonicalization and exchange reuse") {
+ "canonicalization and exchange reuse",
-+ IgnoreComet("TODO: Support SubqueryBroadcastExec in Comet: #1737")) {
++ IgnoreComet("TODO:
https://github.com/apache/datafusion-comet/issues/1839")) {
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key ->
"true") {
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
val df = sql(
-@@ -1455,7 +1461,8 @@ abstract class DynamicPartitionPruningSuiteBase
+@@ -1424,7 +1431,8 @@ abstract class DynamicPartitionPruningSuiteBase
+ }
+ }
+
+- test("SPARK-34637: DPP side broadcast query stage is created firstly") {
++ test("SPARK-34637: DPP side broadcast query stage is created firstly",
++ IgnoreComet("TODO:
https://github.com/apache/datafusion-comet/issues/1839")) {
+ withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key ->
"true") {
+ val df = sql(
+ """ WITH v as (
+@@ -1455,7 +1463,8 @@ abstract class DynamicPartitionPruningSuiteBase
}
}
@@ -448,7 +496,7 @@ index 2c24cc7d570..3e6a8632fa6 100644
val df = sql(
"""
|SELECT s.store_id, f.product_id
-@@ -1730,6 +1737,8 @@ abstract class DynamicPartitionPruningV1Suite extends
DynamicPartitionPruningDat
+@@ -1730,6 +1739,8 @@ abstract class DynamicPartitionPruningV1Suite extends
DynamicPartitionPruningDat
case s: BatchScanExec =>
// we use f1 col for v2 tables due to schema pruning
s.output.exists(_.exists(_.argString(maxFields =
100).contains("f1")))
@@ -649,7 +697,7 @@ index 53e47f428c3..a55d8f0c161 100644
assert(shuffleMergeJoins.size == 1)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
-index fcb937d82ba..fafe8e8d08b 100644
+index fcb937d82ba..fc208087a69 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
@@ -29,7 +29,8 @@ import
org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
@@ -775,7 +823,7 @@ index fcb937d82ba..fafe8e8d08b 100644
}.size === 1)
// Same result between shuffled hash join and sort merge join
checkAnswer(shjDF, smjResult)
-@@ -1435,13 +1449,19 @@ class JoinSuite extends QueryTest with
SharedSparkSession with AdaptiveSparkPlan
+@@ -1435,13 +1449,20 @@ class JoinSuite extends QueryTest with
SharedSparkSession with AdaptiveSparkPlan
assert(shjCodegenDF.queryExecution.executedPlan.collect {
case WholeStageCodegenExec(_ : ShuffledHashJoinExec) => true
case WholeStageCodegenExec(ProjectExec(_, _ :
ShuffledHashJoinExec)) => true
@@ -783,6 +831,7 @@ index fcb937d82ba..fafe8e8d08b 100644
+ true
+ case WholeStageCodegenExec(ColumnarToRowExec(
+ InputAdapter(CometProjectExec(_, _, _, _, _: CometHashJoinExec,
_)))) => true
++ case _: CometHashJoinExec => true
}.size === 1)
checkAnswer(shjCodegenDF, Seq.empty)
@@ -796,7 +845,7 @@ index fcb937d82ba..fafe8e8d08b 100644
checkAnswer(shjNonCodegenDF, Seq.empty)
}
}
-@@ -1489,7 +1509,8 @@ class JoinSuite extends QueryTest with
SharedSparkSession with AdaptiveSparkPlan
+@@ -1489,7 +1510,8 @@ class JoinSuite extends QueryTest with
SharedSparkSession with AdaptiveSparkPlan
val plan = sql(getAggQuery(selectExpr,
joinType)).queryExecution.executedPlan
assert(collect(plan) { case _: BroadcastNestedLoopJoinExec => true
}.size === 1)
// Have shuffle before aggregation
@@ -806,7 +855,7 @@ index fcb937d82ba..fafe8e8d08b 100644
}
def getJoinQuery(selectExpr: String, joinType: String): String = {
-@@ -1518,9 +1539,12 @@ class JoinSuite extends QueryTest with
SharedSparkSession with AdaptiveSparkPlan
+@@ -1518,9 +1540,12 @@ class JoinSuite extends QueryTest with
SharedSparkSession with AdaptiveSparkPlan
}
val plan = sql(getJoinQuery(selectExpr,
joinType)).queryExecution.executedPlan
assert(collect(plan) { case _: BroadcastNestedLoopJoinExec => true
}.size === 1)
@@ -821,7 +870,7 @@ index fcb937d82ba..fafe8e8d08b 100644
}
// Test output ordering is not preserved
-@@ -1529,9 +1553,12 @@ class JoinSuite extends QueryTest with
SharedSparkSession with AdaptiveSparkPlan
+@@ -1529,9 +1554,12 @@ class JoinSuite extends QueryTest with
SharedSparkSession with AdaptiveSparkPlan
val selectExpr = "/*+ BROADCAST(left_t) */ k1 as k0"
val plan = sql(getJoinQuery(selectExpr,
joinType)).queryExecution.executedPlan
assert(collect(plan) { case _: BroadcastNestedLoopJoinExec => true
}.size === 1)
@@ -836,7 +885,7 @@ index fcb937d82ba..fafe8e8d08b 100644
}
// Test singe partition
-@@ -1541,7 +1568,8 @@ class JoinSuite extends QueryTest with
SharedSparkSession with AdaptiveSparkPlan
+@@ -1541,7 +1569,8 @@ class JoinSuite extends QueryTest with
SharedSparkSession with AdaptiveSparkPlan
|FROM range(0, 10, 1, 1) t1 FULL OUTER JOIN range(0, 10, 1, 1) t2
|""".stripMargin)
val plan = fullJoinDF.queryExecution.executedPlan
@@ -846,7 +895,7 @@ index fcb937d82ba..fafe8e8d08b 100644
checkAnswer(fullJoinDF, Row(100))
}
}
-@@ -1586,6 +1614,9 @@ class JoinSuite extends QueryTest with
SharedSparkSession with AdaptiveSparkPlan
+@@ -1586,6 +1615,9 @@ class JoinSuite extends QueryTest with
SharedSparkSession with AdaptiveSparkPlan
Seq(semiJoinDF, antiJoinDF).foreach { df =>
assert(collect(df.queryExecution.executedPlan) {
case j: ShuffledHashJoinExec if j.ignoreDuplicatedKey ==
ignoreDuplicatedKey => true
@@ -856,7 +905,7 @@ index fcb937d82ba..fafe8e8d08b 100644
}.size == 1)
}
}
-@@ -1630,14 +1661,20 @@ class JoinSuite extends QueryTest with
SharedSparkSession with AdaptiveSparkPlan
+@@ -1630,14 +1662,20 @@ class JoinSuite extends QueryTest with
SharedSparkSession with AdaptiveSparkPlan
test("SPARK-43113: Full outer join with duplicate stream-side references in
condition (SMJ)") {
def check(plan: SparkPlan): Unit = {
@@ -879,7 +928,7 @@ index fcb937d82ba..fafe8e8d08b 100644
}
dupStreamSideColTest("SHUFFLE_HASH", check)
}
-@@ -1773,7 +1810,8 @@ class ThreadLeakInSortMergeJoinSuite
+@@ -1773,7 +1811,8 @@ class ThreadLeakInSortMergeJoinSuite
sparkConf.set(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD, 20))
}
@@ -1451,6 +1500,28 @@ index 47679ed7865..9ffbaecb98e 100644
}.length == hashAggCount)
assert(collectWithSubqueries(plan) { case s: SortAggregateExec => s
}.length == sortAggCount)
}
+diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala
+index eec396b2e39..bf3f1c769d6 100644
+---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala
++++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala
+@@ -18,7 +18,7 @@
+ package org.apache.spark.sql.execution
+
+ import org.apache.spark.TestUtils.assertSpilled
+-import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
++import org.apache.spark.sql.{AnalysisException, IgnoreComet, QueryTest, Row}
+ import
org.apache.spark.sql.internal.SQLConf.{WINDOW_EXEC_BUFFER_IN_MEMORY_THRESHOLD,
WINDOW_EXEC_BUFFER_SPILL_THRESHOLD}
+ import org.apache.spark.sql.test.SharedSparkSession
+
+@@ -470,7 +470,7 @@ class SQLWindowFunctionSuite extends QueryTest with
SharedSparkSession {
+ Row(1, 3, null) :: Row(2, null, 4) :: Nil)
+ }
+
+- test("test with low buffer spill threshold") {
++ test("test with low buffer spill threshold", IgnoreComet("Comet does not
support spilling")) {
+ val nums = sparkContext.parallelize(1 to 10).map(x => (x, x %
2)).toDF("x", "y")
+ nums.createOrReplaceTempView("nums")
+
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
index 966f4e74712..8017e22d7f8 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
diff --git a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala
b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala
index c9dd4f17b..54e6e6364 100644
--- a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala
+++ b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala
@@ -35,7 +35,7 @@ import org.apache.spark.sql.execution.window.WindowExec
import org.apache.spark.sql.types.{DoubleType, FloatType}
import org.apache.comet.{CometConf, ExtendedExplainInfo}
-import org.apache.comet.CometConf.{COMET_ANSI_MODE_ENABLED,
COMET_SHUFFLE_FALLBACK_TO_COLUMNAR}
+import org.apache.comet.CometConf.COMET_ANSI_MODE_ENABLED
import org.apache.comet.CometSparkSessionExtensions.{createMessage,
getCometBroadcastNotEnabledReason, getCometShuffleNotEnabledReason,
isANSIEnabled, isCometBroadCastForceEnabled, isCometExecEnabled,
isCometJVMShuffleMode, isCometLoaded, isCometNativeShuffleMode, isCometScan,
isCometShuffleEnabled, isSpark40Plus, shouldApplySparkToColumnar, withInfo}
import org.apache.comet.serde.OperatorOuterClass.Operator
import org.apache.comet.serde.QueryPlanSerde
@@ -491,16 +491,9 @@ case class CometExecRule(session: SparkSession) extends
Rule[SparkPlan] {
None
}
- // this is a temporary workaround because some Spark SQL tests fail
- // when we enable COMET_SHUFFLE_FALLBACK_TO_COLUMNAR due to valid bugs
- // that we had not previously seen
- val tryColumnarNext =
- !nativePrecondition || (nativePrecondition && nativeShuffle.isEmpty
&&
- COMET_SHUFFLE_FALLBACK_TO_COLUMNAR.get(conf))
-
val nativeOrColumnarShuffle = if (nativeShuffle.isDefined) {
nativeShuffle
- } else if (tryColumnarNext) {
+ } else {
// Columnar shuffle for regular Spark operators (not Comet) and
Comet operators
// (if configured).
// If the child of ShuffleExchangeExec is also a
ShuffleExchangeExec, we should not
@@ -526,8 +519,6 @@ case class CometExecRule(session: SparkSession) extends
Rule[SparkPlan] {
} else {
None
}
- } else {
- None
}
if (nativeOrColumnarShuffle.isDefined) {
diff --git
a/spark/src/main/scala/org/apache/comet/rules/EliminateRedundantTransitions.scala
b/spark/src/main/scala/org/apache/comet/rules/EliminateRedundantTransitions.scala
index ecc0823d6..a1a96d321 100644
---
a/spark/src/main/scala/org/apache/comet/rules/EliminateRedundantTransitions.scala
+++
b/spark/src/main/scala/org/apache/comet/rules/EliminateRedundantTransitions.scala
@@ -25,6 +25,7 @@ import org.apache.spark.sql.comet.{CometCollectLimitExec,
CometColumnarToRowExec
import org.apache.spark.sql.comet.execution.shuffle.{CometColumnarShuffle,
CometShuffleExchangeExec}
import org.apache.spark.sql.execution.{ColumnarToRowExec, RowToColumnarExec,
SparkPlan}
import org.apache.spark.sql.execution.adaptive.QueryStageExec
+import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
import org.apache.comet.CometConf
@@ -56,7 +57,8 @@ case class EliminateRedundantTransitions(session:
SparkSession) extends Rule[Spa
override def apply(plan: SparkPlan): SparkPlan = {
val newPlan = _apply(plan)
if (showTransformations) {
- logInfo(s"\nINPUT: $plan\nOUTPUT: $newPlan")
+ // scalastyle:off println
+ System.err.println(s"EliminateRedundantTransitions:\nINPUT:
$plan\nOUTPUT: $newPlan")
}
newPlan
}
@@ -64,7 +66,7 @@ case class EliminateRedundantTransitions(session:
SparkSession) extends Rule[Spa
private def _apply(plan: SparkPlan): SparkPlan = {
val eliminatedPlan = plan transformUp {
case ColumnarToRowExec(shuffleExchangeExec: CometShuffleExchangeExec)
- if (plan.conf.adaptiveExecutionEnabled) =>
+ if plan.conf.adaptiveExecutionEnabled =>
shuffleExchangeExec
case ColumnarToRowExec(sparkToColumnar: CometSparkToColumnarExec) =>
if (sparkToColumnar.child.supportsColumnar) {
@@ -112,6 +114,7 @@ case class EliminateRedundantTransitions(session:
SparkSession) extends Rule[Spa
private def hasCometNativeChild(op: SparkPlan): Boolean = {
op match {
case c: QueryStageExec => hasCometNativeChild(c.plan)
+ case c: ReusedExchangeExec => hasCometNativeChild(c.child)
case _ => op.exists(_.isInstanceOf[CometPlan])
}
}
diff --git
a/spark/src/main/scala/org/apache/spark/sql/comet/CometColumnarToRowExec.scala
b/spark/src/main/scala/org/apache/spark/sql/comet/CometColumnarToRowExec.scala
index 0391a1c3b..95e03ca69 100644
---
a/spark/src/main/scala/org/apache/spark/sql/comet/CometColumnarToRowExec.scala
+++
b/spark/src/main/scala/org/apache/spark/sql/comet/CometColumnarToRowExec.scala
@@ -37,6 +37,7 @@ import org.apache.spark.sql.comet.util.{Utils => CometUtils}
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.{CodegenSupport,
ColumnarToRowTransition, SparkPlan, SQLExecution}
import org.apache.spark.sql.execution.adaptive.BroadcastQueryStageExec
+import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.execution.vectorized.{ConstantColumnVector,
WritableColumnVector}
import org.apache.spark.sql.types._
@@ -172,6 +173,7 @@ case class CometColumnarToRowExec(child: SparkPlan)
op match {
case b: CometBroadcastExchangeExec => Some(b)
case b: BroadcastQueryStageExec => findCometBroadcastExchange(b.plan)
+ case b: ReusedExchangeExec => findCometBroadcastExchange(b.child)
case _ =>
op.children.collectFirst(Function.unlift(findCometBroadcastExchange))
}
}
diff --git
a/spark/src/test/scala/org/apache/spark/sql/CometTPCDSQuerySuite.scala
b/spark/src/test/scala/org/apache/spark/sql/CometTPCDSQuerySuite.scala
index 8d084fd75..0e582ddab 100644
--- a/spark/src/test/scala/org/apache/spark/sql/CometTPCDSQuerySuite.scala
+++ b/spark/src/test/scala/org/apache/spark/sql/CometTPCDSQuerySuite.scala
@@ -188,6 +188,7 @@ class CometTPCDSQuerySuite
conf.set(CometConf.COMET_NATIVE_SCAN_ENABLED.key, "true")
conf.set(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key, "true")
conf.set(CometConf.COMET_MEMORY_OVERHEAD.key, "15g")
+ conf.set(CometConf.COMET_EXPLAIN_TRANSFORMATIONS.key, "true")
conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
conf.set(MEMORY_OFFHEAP_ENABLED.key, "true")
conf.set(MEMORY_OFFHEAP_SIZE.key, "15g")
diff --git a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala
b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala
index 0b15def98..2a7983f0b 100644
--- a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala
+++ b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala
@@ -78,7 +78,6 @@ abstract class CometTestBase
conf.set(CometConf.COMET_ENABLED.key, "true")
conf.set(CometConf.COMET_EXEC_ENABLED.key, "true")
conf.set(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key, "true")
- conf.set(CometConf.COMET_SHUFFLE_FALLBACK_TO_COLUMNAR.key, "true")
conf.set(CometConf.COMET_SPARK_TO_ARROW_ENABLED.key, "true")
conf.set(CometConf.COMET_NATIVE_SCAN_ENABLED.key, "true")
conf.set(CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key, "true")
diff --git
a/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala
b/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala
index 9f611612f..cf887a101 100644
---
a/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala
+++
b/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala
@@ -272,7 +272,6 @@ trait CometPlanStabilitySuite extends
DisableAdaptiveExecutionSuite with TPCDSBa
CometConf.COMET_DPP_FALLBACK_ENABLED.key -> "false",
SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> dppEnabled.toString,
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
- CometConf.COMET_SHUFFLE_FALLBACK_TO_COLUMNAR.key -> "true",
CometConf.COMET_EXEC_SORT_MERGE_JOIN_WITH_JOIN_FILTER_ENABLED.key ->
"true",
CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true", // needed for
v1.4/q9, v1.4/q44, v2.7.0/q6, v2.7.0/q64
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "10MB") {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]