This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 068b900eb fix: Remove COMET_SHUFFLE_FALLBACK_TO_COLUMNAR hack (#1865)
068b900eb is described below

commit 068b900ebab231ddf2d7168654aa53962f3e7bb9
Author: Andy Grove <[email protected]>
AuthorDate: Mon Jun 9 16:01:29 2025 -0600

    fix: Remove COMET_SHUFFLE_FALLBACK_TO_COLUMNAR hack (#1865)
---
 .../main/scala/org/apache/comet/CometConf.scala    |   7 --
 dev/diffs/3.4.3.diff                               | 119 ++++++++++++++----
 dev/diffs/3.5.4.diff                               | 107 +++++++++++++----
 dev/diffs/3.5.5.diff                               | 133 +++++++++++++++++----
 dev/diffs/3.5.6.diff                               | 133 +++++++++++++++++----
 dev/diffs/4.0.0-preview1.diff                      | 121 +++++++++++++++----
 .../org/apache/comet/rules/CometExecRule.scala     |  13 +-
 .../rules/EliminateRedundantTransitions.scala      |   7 +-
 .../spark/sql/comet/CometColumnarToRowExec.scala   |   2 +
 .../apache/spark/sql/CometTPCDSQuerySuite.scala    |   1 +
 .../scala/org/apache/spark/sql/CometTestBase.scala |   1 -
 .../spark/sql/comet/CometPlanStabilitySuite.scala  |   1 -
 12 files changed, 509 insertions(+), 136 deletions(-)

diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala 
b/common/src/main/scala/org/apache/comet/CometConf.scala
index 9807ebe04..317303eb7 100644
--- a/common/src/main/scala/org/apache/comet/CometConf.scala
+++ b/common/src/main/scala/org/apache/comet/CometConf.scala
@@ -289,13 +289,6 @@ object CometConf extends ShimCometConf {
     .checkValues(Set("native", "jvm", "auto"))
     .createWithDefault("auto")
 
-  val COMET_SHUFFLE_FALLBACK_TO_COLUMNAR: ConfigEntry[Boolean] =
-    conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.fallbackToColumnar")
-      .doc("Whether to try falling back to columnar shuffle when native 
shuffle is not supported")
-      .internal()
-      .booleanConf
-      .createWithDefault(false)
-
   val COMET_EXEC_BROADCAST_FORCE_ENABLED: ConfigEntry[Boolean] =
     conf(s"$COMET_EXEC_CONFIG_PREFIX.broadcast.enabled")
       .doc(
diff --git a/dev/diffs/3.4.3.diff b/dev/diffs/3.4.3.diff
index 18e91d9da..dda075f02 100644
--- a/dev/diffs/3.4.3.diff
+++ b/dev/diffs/3.4.3.diff
@@ -247,7 +247,7 @@ index cf40e944c09..bdd5be4f462 100644
  
    test("A cached table preserves the partitioning and ordering of its cached 
SparkPlan") {
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
-index 1cc09c3d7fc..b85b53a9688 100644
+index 1cc09c3d7fc..f031fa45c33 100644
 --- 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
 +++ 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
 @@ -27,7 +27,7 @@ import org.apache.spark.SparkException
@@ -268,16 +268,6 @@ index 1cc09c3d7fc..b85b53a9688 100644
        }
        assert(exchangePlans.length == 1)
      }
-@@ -1100,7 +1100,8 @@ class DataFrameAggregateSuite extends QueryTest
-     }
-   }
- 
--  test("SPARK-32038: NormalizeFloatingNumbers should work on distinct 
aggregate") {
-+  test("SPARK-32038: NormalizeFloatingNumbers should work on distinct 
aggregate",
-+    IgnoreComet("TODO: 
https://github.com/apache/datafusion-comet/issues/1824";)) {
-     withTempView("view") {
-       val nan1 = java.lang.Float.intBitsToFloat(0x7f800001)
-       val nan2 = java.lang.Float.intBitsToFloat(0x7fffffff)
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
 index 56e9520fdab..917932336df 100644
 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
@@ -363,6 +353,44 @@ index a9f69ab28a1..5d9d4f2cb83 100644
      withTable("tbl") {
        sql(
          """
+diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
+index 433b4741979..07148eee480 100644
+--- 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
++++ 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
+@@ -23,8 +23,9 @@ import org.apache.spark.TestUtils.{assertNotSpilled, 
assertSpilled}
+ import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression, Lag, Literal, NonFoldableLiteral}
+ import org.apache.spark.sql.catalyst.optimizer.TransposeWindow
+ import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
++import org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec
+ import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+-import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, 
Exchange, ShuffleExchangeExec}
++import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, 
Exchange, ShuffleExchangeExec, ShuffleExchangeLike}
+ import org.apache.spark.sql.execution.window.WindowExec
+ import org.apache.spark.sql.expressions.{Aggregator, 
MutableAggregationBuffer, UserDefinedAggregateFunction, Window}
+ import org.apache.spark.sql.functions._
+@@ -1186,10 +1187,12 @@ class DataFrameWindowFunctionsSuite extends QueryTest
+     }
+ 
+     def isShuffleExecByRequirement(
+-        plan: ShuffleExchangeExec,
++        plan: ShuffleExchangeLike,
+         desiredClusterColumns: Seq[String]): Boolean = plan match {
+       case ShuffleExchangeExec(op: HashPartitioning, _, ENSURE_REQUIREMENTS) 
=>
+         partitionExpressionsColumns(op.expressions) === desiredClusterColumns
++      case CometShuffleExchangeExec(op: HashPartitioning, _, _, 
ENSURE_REQUIREMENTS, _, _) =>
++        partitionExpressionsColumns(op.expressions) === desiredClusterColumns
+       case _ => false
+     }
+ 
+@@ -1212,7 +1215,7 @@ class DataFrameWindowFunctionsSuite extends QueryTest
+       val shuffleByRequirement = windowed.queryExecution.executedPlan.exists {
+         case w: WindowExec =>
+           w.child.exists {
+-            case s: ShuffleExchangeExec => isShuffleExecByRequirement(s, 
Seq("key1", "key2"))
++            case s: ShuffleExchangeLike => isShuffleExecByRequirement(s, 
Seq("key1", "key2"))
+             case _ => false
+           }
+         case _ => false
 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
 index daef11ae4d6..9f3cc9181f2 100644
 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -386,7 +414,7 @@ index daef11ae4d6..9f3cc9181f2 100644
      assert(exchanges.size == 2)
    }
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
-index f33432ddb6f..cc5224af735 100644
+index f33432ddb6f..1925aac8d97 100644
 --- 
a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
 +++ 
b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
 @@ -22,6 +22,7 @@ import org.scalatest.GivenWhenThen
@@ -417,17 +445,37 @@ index f33432ddb6f..cc5224af735 100644
      Given("disable broadcast pruning and disable subquery duplication")
      withSQLConf(
        SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true",
-@@ -1215,7 +1220,8 @@ abstract class DynamicPartitionPruningSuiteBase
+@@ -1027,7 +1032,8 @@ abstract class DynamicPartitionPruningSuiteBase
+     }
+   }
+ 
+-  test("avoid reordering broadcast join keys to match input hash 
partitioning") {
++  test("avoid reordering broadcast join keys to match input hash 
partitioning",
++    IgnoreComet("TODO: 
https://github.com/apache/datafusion-comet/issues/1839";)) {
+     withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> 
"false",
+       SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
+       withTable("large", "dimTwo", "dimThree") {
+@@ -1215,7 +1221,8 @@ abstract class DynamicPartitionPruningSuiteBase
    }
  
    test("SPARK-32509: Unused Dynamic Pruning filter shouldn't affect " +
 -    "canonicalization and exchange reuse") {
 +    "canonicalization and exchange reuse",
-+    IgnoreComet("TODO: Support SubqueryBroadcastExec in Comet: #1737")) {
++    IgnoreComet("TODO: 
https://github.com/apache/datafusion-comet/issues/1839";)) {
      withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> 
"true") {
        withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
          val df = sql(
-@@ -1729,6 +1735,8 @@ abstract class DynamicPartitionPruningV1Suite extends 
DynamicPartitionPruningDat
+@@ -1423,7 +1430,8 @@ abstract class DynamicPartitionPruningSuiteBase
+     }
+   }
+ 
+-  test("SPARK-34637: DPP side broadcast query stage is created firstly") {
++  test("SPARK-34637: DPP side broadcast query stage is created firstly",
++    IgnoreComet("TODO: 
https://github.com/apache/datafusion-comet/issues/1839";)) {
+     withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> 
"true") {
+       val df = sql(
+         """ WITH v as (
+@@ -1729,6 +1737,8 @@ abstract class DynamicPartitionPruningV1Suite extends 
DynamicPartitionPruningDat
                case s: BatchScanExec =>
                  // we use f1 col for v2 tables due to schema pruning
                  s.output.exists(_.exists(_.argString(maxFields = 
100).contains("f1")))
@@ -611,7 +659,7 @@ index 1792b4c32eb..1616e6f39bd 100644
      assert(shuffleMergeJoins.size == 1)
    }
 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
-index 7f062bfb899..b347ef905d2 100644
+index 7f062bfb899..0ed85486e80 100644
 --- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
 +++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
 @@ -30,7 +30,8 @@ import org.apache.spark.sql.catalyst.TableIdentifier
@@ -707,7 +755,7 @@ index 7f062bfb899..b347ef905d2 100644
        // Same result between shuffled hash join and sort merge join
        checkAnswer(shjDF, smjResult)
      }
-@@ -1282,18 +1292,25 @@ class JoinSuite extends QueryTest with 
SharedSparkSession with AdaptiveSparkPlan
+@@ -1282,18 +1292,26 @@ class JoinSuite extends QueryTest with 
SharedSparkSession with AdaptiveSparkPlan
        }
  
        // Test shuffled hash join
@@ -722,6 +770,7 @@ index 7f062bfb899..b347ef905d2 100644
 +            true
 +          case WholeStageCodegenExec(ColumnarToRowExec(
 +            InputAdapter(CometProjectExec(_, _, _, _, _: CometHashJoinExec, 
_)))) => true
++          case _: CometHashJoinExec => true
          }.size === 1)
          checkAnswer(shjCodegenDF, Seq.empty)
  
@@ -735,7 +784,7 @@ index 7f062bfb899..b347ef905d2 100644
            checkAnswer(shjNonCodegenDF, Seq.empty)
          }
        }
-@@ -1341,7 +1358,8 @@ class JoinSuite extends QueryTest with 
SharedSparkSession with AdaptiveSparkPlan
+@@ -1341,7 +1359,8 @@ class JoinSuite extends QueryTest with 
SharedSparkSession with AdaptiveSparkPlan
            val plan = sql(getAggQuery(selectExpr, 
joinType)).queryExecution.executedPlan
            assert(collect(plan) { case _: BroadcastNestedLoopJoinExec => true 
}.size === 1)
            // Have shuffle before aggregation
@@ -745,7 +794,7 @@ index 7f062bfb899..b347ef905d2 100644
        }
  
        def getJoinQuery(selectExpr: String, joinType: String): String = {
-@@ -1370,9 +1388,12 @@ class JoinSuite extends QueryTest with 
SharedSparkSession with AdaptiveSparkPlan
+@@ -1370,9 +1389,12 @@ class JoinSuite extends QueryTest with 
SharedSparkSession with AdaptiveSparkPlan
            }
            val plan = sql(getJoinQuery(selectExpr, 
joinType)).queryExecution.executedPlan
            assert(collect(plan) { case _: BroadcastNestedLoopJoinExec => true 
}.size === 1)
@@ -760,7 +809,7 @@ index 7f062bfb899..b347ef905d2 100644
        }
  
        // Test output ordering is not preserved
-@@ -1381,9 +1402,12 @@ class JoinSuite extends QueryTest with 
SharedSparkSession with AdaptiveSparkPlan
+@@ -1381,9 +1403,12 @@ class JoinSuite extends QueryTest with 
SharedSparkSession with AdaptiveSparkPlan
            val selectExpr = "/*+ BROADCAST(left_t) */ k1 as k0"
            val plan = sql(getJoinQuery(selectExpr, 
joinType)).queryExecution.executedPlan
            assert(collect(plan) { case _: BroadcastNestedLoopJoinExec => true 
}.size === 1)
@@ -775,7 +824,7 @@ index 7f062bfb899..b347ef905d2 100644
        }
  
        // Test singe partition
-@@ -1393,7 +1417,8 @@ class JoinSuite extends QueryTest with 
SharedSparkSession with AdaptiveSparkPlan
+@@ -1393,7 +1418,8 @@ class JoinSuite extends QueryTest with 
SharedSparkSession with AdaptiveSparkPlan
             |FROM range(0, 10, 1, 1) t1 FULL OUTER JOIN range(0, 10, 1, 1) t2
             |""".stripMargin)
        val plan = fullJoinDF.queryExecution.executedPlan
@@ -785,7 +834,7 @@ index 7f062bfb899..b347ef905d2 100644
        checkAnswer(fullJoinDF, Row(100))
      }
    }
-@@ -1438,6 +1463,9 @@ class JoinSuite extends QueryTest with 
SharedSparkSession with AdaptiveSparkPlan
+@@ -1438,6 +1464,9 @@ class JoinSuite extends QueryTest with 
SharedSparkSession with AdaptiveSparkPlan
            Seq(semiJoinDF, antiJoinDF).foreach { df =>
              assert(collect(df.queryExecution.executedPlan) {
                case j: ShuffledHashJoinExec if j.ignoreDuplicatedKey == 
ignoreDuplicatedKey => true
@@ -795,7 +844,7 @@ index 7f062bfb899..b347ef905d2 100644
              }.size == 1)
            }
        }
-@@ -1482,14 +1510,20 @@ class JoinSuite extends QueryTest with 
SharedSparkSession with AdaptiveSparkPlan
+@@ -1482,14 +1511,20 @@ class JoinSuite extends QueryTest with 
SharedSparkSession with AdaptiveSparkPlan
  
    test("SPARK-43113: Full outer join with duplicate stream-side references in 
condition (SMJ)") {
      def check(plan: SparkPlan): Unit = {
@@ -818,7 +867,7 @@ index 7f062bfb899..b347ef905d2 100644
      }
      dupStreamSideColTest("SHUFFLE_HASH", check)
    }
-@@ -1605,7 +1639,8 @@ class ThreadLeakInSortMergeJoinSuite
+@@ -1605,7 +1640,8 @@ class ThreadLeakInSortMergeJoinSuite
        sparkConf.set(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD, 20))
    }
  
@@ -1280,6 +1329,28 @@ index 47679ed7865..9ffbaecb98e 100644
      }.length == hashAggCount)
      assert(collectWithSubqueries(plan) { case s: SortAggregateExec => s 
}.length == sortAggCount)
    }
+diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala
+index eec396b2e39..bf3f1c769d6 100644
+--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala
++++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala
+@@ -18,7 +18,7 @@
+ package org.apache.spark.sql.execution
+ 
+ import org.apache.spark.TestUtils.assertSpilled
+-import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
++import org.apache.spark.sql.{AnalysisException, IgnoreComet, QueryTest, Row}
+ import 
org.apache.spark.sql.internal.SQLConf.{WINDOW_EXEC_BUFFER_IN_MEMORY_THRESHOLD, 
WINDOW_EXEC_BUFFER_SPILL_THRESHOLD}
+ import org.apache.spark.sql.test.SharedSparkSession
+ 
+@@ -470,7 +470,7 @@ class SQLWindowFunctionSuite extends QueryTest with 
SharedSparkSession {
+       Row(1, 3, null) :: Row(2, null, 4) :: Nil)
+   }
+ 
+-  test("test with low buffer spill threshold") {
++  test("test with low buffer spill threshold", IgnoreComet("Comet does not 
support spilling")) {
+     val nums = sparkContext.parallelize(1 to 10).map(x => (x, x % 
2)).toDF("x", "y")
+     nums.createOrReplaceTempView("nums")
+ 
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
 index b14f4a405f6..ab7baf434a5 100644
 --- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
diff --git a/dev/diffs/3.5.4.diff b/dev/diffs/3.5.4.diff
index 32d4d617e..369587175 100644
--- a/dev/diffs/3.5.4.diff
+++ b/dev/diffs/3.5.4.diff
@@ -226,7 +226,7 @@ index 9815cb816c9..95b5f9992b0 100644
  
    test("A cached table preserves the partitioning and ordering of its cached 
SparkPlan") {
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
-index 5a8681aed97..db69fde723a 100644
+index 5a8681aed97..da9d25e2eb4 100644
 --- 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
 +++ 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
 @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.plans.logical.Expand
@@ -247,16 +247,6 @@ index 5a8681aed97..db69fde723a 100644
        }
        assert(exchangePlans.length == 1)
      }
-@@ -1255,7 +1255,8 @@ class DataFrameAggregateSuite extends QueryTest
-     }
-   }
- 
--  test("SPARK-32038: NormalizeFloatingNumbers should work on distinct 
aggregate") {
-+  test("SPARK-32038: NormalizeFloatingNumbers should work on distinct 
aggregate",
-+    IgnoreComet("TODO: 
https://github.com/apache/datafusion-comet/issues/1824";)) {
-     withTempView("view") {
-       val nan1 = java.lang.Float.intBitsToFloat(0x7f800001)
-       val nan2 = java.lang.Float.intBitsToFloat(0x7fffffff)
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
 index 56e9520fdab..917932336df 100644
 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
@@ -342,6 +332,44 @@ index 7ee18df3756..64f01a68048 100644
      withTable("tbl") {
        sql(
          """
+diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
+index 47a311c71d5..342e71cfdd4 100644
+--- 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
++++ 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
+@@ -24,8 +24,9 @@ import 
org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression
+ import org.apache.spark.sql.catalyst.optimizer.TransposeWindow
+ import org.apache.spark.sql.catalyst.plans.logical.{Window => LogicalWindow}
+ import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
++import org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec
+ import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+-import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, 
Exchange, ShuffleExchangeExec}
++import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, 
Exchange, ShuffleExchangeExec, ShuffleExchangeLike}
+ import org.apache.spark.sql.execution.window.WindowExec
+ import org.apache.spark.sql.expressions.{Aggregator, 
MutableAggregationBuffer, UserDefinedAggregateFunction, Window}
+ import org.apache.spark.sql.functions._
+@@ -1187,10 +1188,12 @@ class DataFrameWindowFunctionsSuite extends QueryTest
+     }
+ 
+     def isShuffleExecByRequirement(
+-        plan: ShuffleExchangeExec,
++        plan: ShuffleExchangeLike,
+         desiredClusterColumns: Seq[String]): Boolean = plan match {
+       case ShuffleExchangeExec(op: HashPartitioning, _, ENSURE_REQUIREMENTS, 
_) =>
+         partitionExpressionsColumns(op.expressions) === desiredClusterColumns
++      case CometShuffleExchangeExec(op: HashPartitioning, _, _, 
ENSURE_REQUIREMENTS, _, _) =>
++        partitionExpressionsColumns(op.expressions) === desiredClusterColumns
+       case _ => false
+     }
+ 
+@@ -1213,7 +1216,7 @@ class DataFrameWindowFunctionsSuite extends QueryTest
+       val shuffleByRequirement = windowed.queryExecution.executedPlan.exists {
+         case w: WindowExec =>
+           w.child.exists {
+-            case s: ShuffleExchangeExec => isShuffleExecByRequirement(s, 
Seq("key1", "key2"))
++            case s: ShuffleExchangeLike => isShuffleExecByRequirement(s, 
Seq("key1", "key2"))
+             case _ => false
+           }
+         case _ => false
 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
 index f32b32ffc5a..447d7c6416e 100644
 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -365,7 +393,7 @@ index f32b32ffc5a..447d7c6416e 100644
      assert(exchanges.size == 2)
    }
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
-index f33432ddb6f..19ce507e82b 100644
+index f33432ddb6f..0fa49fb3f0b 100644
 --- 
a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
 +++ 
b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
 @@ -22,6 +22,7 @@ import org.scalatest.GivenWhenThen
@@ -422,7 +450,7 @@ index f33432ddb6f..19ce507e82b 100644
  
 -  test("avoid reordering broadcast join keys to match input hash 
partitioning") {
 +  test("avoid reordering broadcast join keys to match input hash 
partitioning",
-+    IgnoreComet("TODO: Support SubqueryBroadcastExec in Comet: #242")) {
++    IgnoreComet("TODO: 
https://github.com/apache/datafusion-comet/issues/1839";)) {
      withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> 
"false",
        SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
        withTable("large", "dimTwo", "dimThree") {
@@ -442,7 +470,7 @@ index f33432ddb6f..19ce507e82b 100644
    test("SPARK-32509: Unused Dynamic Pruning filter shouldn't affect " +
 -    "canonicalization and exchange reuse") {
 +    "canonicalization and exchange reuse",
-+    IgnoreComet("TODO: Support SubqueryBroadcastExec in Comet: #242")) {
++    IgnoreComet("TODO: 
https://github.com/apache/datafusion-comet/issues/1839";)) {
      withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> 
"true") {
        withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
          val df = sql(
@@ -482,7 +510,7 @@ index f33432ddb6f..19ce507e82b 100644
  
 -  test("SPARK-34637: DPP side broadcast query stage is created firstly") {
 +  test("SPARK-34637: DPP side broadcast query stage is created firstly",
-+    IgnoreComet("TODO: Support SubqueryBroadcastExec in Comet: #242")) {
++    IgnoreComet("TODO: 
https://github.com/apache/datafusion-comet/issues/1839";)) {
      withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> 
"true") {
        val df = sql(
          """ WITH v as (
@@ -746,7 +774,7 @@ index 7af826583bd..3c3def1eb67 100644
      assert(shuffleMergeJoins.size == 1)
    }
 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
-index 4d256154c85..43f0bebb00c 100644
+index 4d256154c85..66a5473852d 100644
 --- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
 +++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
 @@ -31,7 +31,8 @@ import 
org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
@@ -872,7 +900,7 @@ index 4d256154c85..43f0bebb00c 100644
            }.size === 1)
            // Same result between shuffled hash join and sort merge join
            checkAnswer(shjDF, smjResult)
-@@ -1432,13 +1446,19 @@ class JoinSuite extends QueryTest with 
SharedSparkSession with AdaptiveSparkPlan
+@@ -1432,13 +1446,20 @@ class JoinSuite extends QueryTest with 
SharedSparkSession with AdaptiveSparkPlan
          assert(shjCodegenDF.queryExecution.executedPlan.collect {
            case WholeStageCodegenExec(_ : ShuffledHashJoinExec) => true
            case WholeStageCodegenExec(ProjectExec(_, _ : 
ShuffledHashJoinExec)) => true
@@ -880,6 +908,7 @@ index 4d256154c85..43f0bebb00c 100644
 +            true
 +          case WholeStageCodegenExec(ColumnarToRowExec(
 +            InputAdapter(CometProjectExec(_, _, _, _, _: CometHashJoinExec, 
_)))) => true
++          case _: CometHashJoinExec => true
          }.size === 1)
          checkAnswer(shjCodegenDF, Seq.empty)
  
@@ -893,7 +922,7 @@ index 4d256154c85..43f0bebb00c 100644
            checkAnswer(shjNonCodegenDF, Seq.empty)
          }
        }
-@@ -1486,7 +1506,8 @@ class JoinSuite extends QueryTest with 
SharedSparkSession with AdaptiveSparkPlan
+@@ -1486,7 +1507,8 @@ class JoinSuite extends QueryTest with 
SharedSparkSession with AdaptiveSparkPlan
            val plan = sql(getAggQuery(selectExpr, 
joinType)).queryExecution.executedPlan
            assert(collect(plan) { case _: BroadcastNestedLoopJoinExec => true 
}.size === 1)
            // Have shuffle before aggregation
@@ -903,7 +932,7 @@ index 4d256154c85..43f0bebb00c 100644
        }
  
        def getJoinQuery(selectExpr: String, joinType: String): String = {
-@@ -1515,9 +1536,12 @@ class JoinSuite extends QueryTest with 
SharedSparkSession with AdaptiveSparkPlan
+@@ -1515,9 +1537,12 @@ class JoinSuite extends QueryTest with 
SharedSparkSession with AdaptiveSparkPlan
            }
            val plan = sql(getJoinQuery(selectExpr, 
joinType)).queryExecution.executedPlan
            assert(collect(plan) { case _: BroadcastNestedLoopJoinExec => true 
}.size === 1)
@@ -918,7 +947,7 @@ index 4d256154c85..43f0bebb00c 100644
        }
  
        // Test output ordering is not preserved
-@@ -1526,9 +1550,12 @@ class JoinSuite extends QueryTest with 
SharedSparkSession with AdaptiveSparkPlan
+@@ -1526,9 +1551,12 @@ class JoinSuite extends QueryTest with 
SharedSparkSession with AdaptiveSparkPlan
            val selectExpr = "/*+ BROADCAST(left_t) */ k1 as k0"
            val plan = sql(getJoinQuery(selectExpr, 
joinType)).queryExecution.executedPlan
            assert(collect(plan) { case _: BroadcastNestedLoopJoinExec => true 
}.size === 1)
@@ -933,7 +962,7 @@ index 4d256154c85..43f0bebb00c 100644
        }
  
        // Test singe partition
-@@ -1538,7 +1565,8 @@ class JoinSuite extends QueryTest with 
SharedSparkSession with AdaptiveSparkPlan
+@@ -1538,7 +1566,8 @@ class JoinSuite extends QueryTest with 
SharedSparkSession with AdaptiveSparkPlan
             |FROM range(0, 10, 1, 1) t1 FULL OUTER JOIN range(0, 10, 1, 1) t2
             |""".stripMargin)
        val plan = fullJoinDF.queryExecution.executedPlan
@@ -943,7 +972,7 @@ index 4d256154c85..43f0bebb00c 100644
        checkAnswer(fullJoinDF, Row(100))
      }
    }
-@@ -1583,6 +1611,9 @@ class JoinSuite extends QueryTest with 
SharedSparkSession with AdaptiveSparkPlan
+@@ -1583,6 +1612,9 @@ class JoinSuite extends QueryTest with 
SharedSparkSession with AdaptiveSparkPlan
            Seq(semiJoinDF, antiJoinDF).foreach { df =>
              assert(collect(df.queryExecution.executedPlan) {
                case j: ShuffledHashJoinExec if j.ignoreDuplicatedKey == 
ignoreDuplicatedKey => true
@@ -953,7 +982,7 @@ index 4d256154c85..43f0bebb00c 100644
              }.size == 1)
            }
        }
-@@ -1627,14 +1658,20 @@ class JoinSuite extends QueryTest with 
SharedSparkSession with AdaptiveSparkPlan
+@@ -1627,14 +1659,20 @@ class JoinSuite extends QueryTest with 
SharedSparkSession with AdaptiveSparkPlan
  
    test("SPARK-43113: Full outer join with duplicate stream-side references in 
condition (SMJ)") {
      def check(plan: SparkPlan): Unit = {
@@ -976,6 +1005,16 @@ index 4d256154c85..43f0bebb00c 100644
      }
      dupStreamSideColTest("SHUFFLE_HASH", check)
    }
+@@ -1770,7 +1808,8 @@ class ThreadLeakInSortMergeJoinSuite
+       sparkConf.set(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD, 20))
+   }
+ 
+-  test("SPARK-47146: thread leak when doing SortMergeJoin (with spill)") {
++  test("SPARK-47146: thread leak when doing SortMergeJoin (with spill)",
++    IgnoreComet("Comet does not support spilling")) {
+ 
+     withSQLConf(
+       SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1") {
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/PlanStabilitySuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/PlanStabilitySuite.scala
 index c26757c9cff..d55775f09d7 100644
 --- a/sql/core/src/test/scala/org/apache/spark/sql/PlanStabilitySuite.scala
@@ -1451,6 +1490,28 @@ index 47679ed7865..9ffbaecb98e 100644
      }.length == hashAggCount)
      assert(collectWithSubqueries(plan) { case s: SortAggregateExec => s 
}.length == sortAggCount)
    }
+diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala
+index eec396b2e39..bf3f1c769d6 100644
+--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala
++++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala
+@@ -18,7 +18,7 @@
+ package org.apache.spark.sql.execution
+ 
+ import org.apache.spark.TestUtils.assertSpilled
+-import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
++import org.apache.spark.sql.{AnalysisException, IgnoreComet, QueryTest, Row}
+ import 
org.apache.spark.sql.internal.SQLConf.{WINDOW_EXEC_BUFFER_IN_MEMORY_THRESHOLD, 
WINDOW_EXEC_BUFFER_SPILL_THRESHOLD}
+ import org.apache.spark.sql.test.SharedSparkSession
+ 
+@@ -470,7 +470,7 @@ class SQLWindowFunctionSuite extends QueryTest with 
SharedSparkSession {
+       Row(1, 3, null) :: Row(2, null, 4) :: Nil)
+   }
+ 
+-  test("test with low buffer spill threshold") {
++  test("test with low buffer spill threshold", IgnoreComet("Comet does not 
support spilling")) {
+     val nums = sparkContext.parallelize(1 to 10).map(x => (x, x % 
2)).toDF("x", "y")
+     nums.createOrReplaceTempView("nums")
+ 
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
 index b14f4a405f6..ab7baf434a5 100644
 --- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
diff --git a/dev/diffs/3.5.5.diff b/dev/diffs/3.5.5.diff
index 9ca531087..75928a077 100644
--- a/dev/diffs/3.5.5.diff
+++ b/dev/diffs/3.5.5.diff
@@ -226,7 +226,7 @@ index 9815cb816c9..95b5f9992b0 100644
  
    test("A cached table preserves the partitioning and ordering of its cached 
SparkPlan") {
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
-index 5a8681aed97..db69fde723a 100644
+index 5a8681aed97..da9d25e2eb4 100644
 --- 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
 +++ 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
 @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.plans.logical.Expand
@@ -247,16 +247,6 @@ index 5a8681aed97..db69fde723a 100644
        }
        assert(exchangePlans.length == 1)
      }
-@@ -1255,7 +1255,8 @@ class DataFrameAggregateSuite extends QueryTest
-     }
-   }
- 
--  test("SPARK-32038: NormalizeFloatingNumbers should work on distinct 
aggregate") {
-+  test("SPARK-32038: NormalizeFloatingNumbers should work on distinct 
aggregate",
-+    IgnoreComet("TODO: 
https://github.com/apache/datafusion-comet/issues/1824";)) {
-     withTempView("view") {
-       val nan1 = java.lang.Float.intBitsToFloat(0x7f800001)
-       val nan2 = java.lang.Float.intBitsToFloat(0x7fffffff)
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
 index 56e9520fdab..917932336df 100644
 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
@@ -342,6 +332,44 @@ index 7ee18df3756..64f01a68048 100644
      withTable("tbl") {
        sql(
          """
+diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
+index 47a311c71d5..342e71cfdd4 100644
+--- 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
++++ 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
+@@ -24,8 +24,9 @@ import 
org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression
+ import org.apache.spark.sql.catalyst.optimizer.TransposeWindow
+ import org.apache.spark.sql.catalyst.plans.logical.{Window => LogicalWindow}
+ import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
++import org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec
+ import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+-import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, 
Exchange, ShuffleExchangeExec}
++import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, 
Exchange, ShuffleExchangeExec, ShuffleExchangeLike}
+ import org.apache.spark.sql.execution.window.WindowExec
+ import org.apache.spark.sql.expressions.{Aggregator, 
MutableAggregationBuffer, UserDefinedAggregateFunction, Window}
+ import org.apache.spark.sql.functions._
+@@ -1187,10 +1188,12 @@ class DataFrameWindowFunctionsSuite extends QueryTest
+     }
+ 
+     def isShuffleExecByRequirement(
+-        plan: ShuffleExchangeExec,
++        plan: ShuffleExchangeLike,
+         desiredClusterColumns: Seq[String]): Boolean = plan match {
+       case ShuffleExchangeExec(op: HashPartitioning, _, ENSURE_REQUIREMENTS, 
_) =>
+         partitionExpressionsColumns(op.expressions) === desiredClusterColumns
++      case CometShuffleExchangeExec(op: HashPartitioning, _, _, 
ENSURE_REQUIREMENTS, _, _) =>
++        partitionExpressionsColumns(op.expressions) === desiredClusterColumns
+       case _ => false
+     }
+ 
+@@ -1213,7 +1216,7 @@ class DataFrameWindowFunctionsSuite extends QueryTest
+       val shuffleByRequirement = windowed.queryExecution.executedPlan.exists {
+         case w: WindowExec =>
+           w.child.exists {
+-            case s: ShuffleExchangeExec => isShuffleExecByRequirement(s, 
Seq("key1", "key2"))
++            case s: ShuffleExchangeLike => isShuffleExecByRequirement(s, 
Seq("key1", "key2"))
+             case _ => false
+           }
+         case _ => false
 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
 index f32b32ffc5a..447d7c6416e 100644
 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -365,7 +393,7 @@ index f32b32ffc5a..447d7c6416e 100644
      assert(exchanges.size == 2)
    }
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
-index f33432ddb6f..fe9f74ff8f1 100644
+index f33432ddb6f..0e1499a24ca 100644
 --- 
a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
 +++ 
b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
 @@ -22,6 +22,7 @@ import org.scalatest.GivenWhenThen
@@ -386,7 +414,37 @@ index f33432ddb6f..fe9f74ff8f1 100644
        case _ => Nil
      }
    }
-@@ -1729,6 +1733,8 @@ abstract class DynamicPartitionPruningV1Suite extends 
DynamicPartitionPruningDat
+@@ -1027,7 +1031,8 @@ abstract class DynamicPartitionPruningSuiteBase
+     }
+   }
+ 
+-  test("avoid reordering broadcast join keys to match input hash 
partitioning") {
++  test("avoid reordering broadcast join keys to match input hash 
partitioning",
++    IgnoreComet("TODO: 
https://github.com/apache/datafusion-comet/issues/1839";)) {
+     withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> 
"false",
+       SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
+       withTable("large", "dimTwo", "dimThree") {
+@@ -1215,7 +1220,8 @@ abstract class DynamicPartitionPruningSuiteBase
+   }
+ 
+   test("SPARK-32509: Unused Dynamic Pruning filter shouldn't affect " +
+-    "canonicalization and exchange reuse") {
++    "canonicalization and exchange reuse",
++    IgnoreComet("TODO: 
https://github.com/apache/datafusion-comet/issues/1839";)) {
+     withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> 
"true") {
+       withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
+         val df = sql(
+@@ -1423,7 +1429,8 @@ abstract class DynamicPartitionPruningSuiteBase
+     }
+   }
+ 
+-  test("SPARK-34637: DPP side broadcast query stage is created firstly") {
++  test("SPARK-34637: DPP side broadcast query stage is created firstly",
++    IgnoreComet("TODO: 
https://github.com/apache/datafusion-comet/issues/1839";)) {
+     withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> 
"true") {
+       val df = sql(
+         """ WITH v as (
+@@ -1729,6 +1736,8 @@ abstract class DynamicPartitionPruningV1Suite extends 
DynamicPartitionPruningDat
                case s: BatchScanExec =>
                  // we use f1 col for v2 tables due to schema pruning
                  s.output.exists(_.exists(_.argString(maxFields = 
100).contains("f1")))
@@ -563,7 +621,7 @@ index 7af826583bd..3c3def1eb67 100644
      assert(shuffleMergeJoins.size == 1)
    }
 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
-index 4d256154c85..43f0bebb00c 100644
+index 4d256154c85..66a5473852d 100644
 --- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
 +++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
 @@ -31,7 +31,8 @@ import 
org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
@@ -689,7 +747,7 @@ index 4d256154c85..43f0bebb00c 100644
            }.size === 1)
            // Same result between shuffled hash join and sort merge join
            checkAnswer(shjDF, smjResult)
-@@ -1432,13 +1446,19 @@ class JoinSuite extends QueryTest with 
SharedSparkSession with AdaptiveSparkPlan
+@@ -1432,13 +1446,20 @@ class JoinSuite extends QueryTest with 
SharedSparkSession with AdaptiveSparkPlan
          assert(shjCodegenDF.queryExecution.executedPlan.collect {
            case WholeStageCodegenExec(_ : ShuffledHashJoinExec) => true
            case WholeStageCodegenExec(ProjectExec(_, _ : 
ShuffledHashJoinExec)) => true
@@ -697,6 +755,7 @@ index 4d256154c85..43f0bebb00c 100644
 +            true
 +          case WholeStageCodegenExec(ColumnarToRowExec(
 +            InputAdapter(CometProjectExec(_, _, _, _, _: CometHashJoinExec, 
_)))) => true
++          case _: CometHashJoinExec => true
          }.size === 1)
          checkAnswer(shjCodegenDF, Seq.empty)
  
@@ -710,7 +769,7 @@ index 4d256154c85..43f0bebb00c 100644
            checkAnswer(shjNonCodegenDF, Seq.empty)
          }
        }
-@@ -1486,7 +1506,8 @@ class JoinSuite extends QueryTest with 
SharedSparkSession with AdaptiveSparkPlan
+@@ -1486,7 +1507,8 @@ class JoinSuite extends QueryTest with 
SharedSparkSession with AdaptiveSparkPlan
            val plan = sql(getAggQuery(selectExpr, 
joinType)).queryExecution.executedPlan
            assert(collect(plan) { case _: BroadcastNestedLoopJoinExec => true 
}.size === 1)
            // Have shuffle before aggregation
@@ -720,7 +779,7 @@ index 4d256154c85..43f0bebb00c 100644
        }
  
        def getJoinQuery(selectExpr: String, joinType: String): String = {
-@@ -1515,9 +1536,12 @@ class JoinSuite extends QueryTest with 
SharedSparkSession with AdaptiveSparkPlan
+@@ -1515,9 +1537,12 @@ class JoinSuite extends QueryTest with 
SharedSparkSession with AdaptiveSparkPlan
            }
            val plan = sql(getJoinQuery(selectExpr, 
joinType)).queryExecution.executedPlan
            assert(collect(plan) { case _: BroadcastNestedLoopJoinExec => true 
}.size === 1)
@@ -735,7 +794,7 @@ index 4d256154c85..43f0bebb00c 100644
        }
  
        // Test output ordering is not preserved
-@@ -1526,9 +1550,12 @@ class JoinSuite extends QueryTest with 
SharedSparkSession with AdaptiveSparkPlan
+@@ -1526,9 +1551,12 @@ class JoinSuite extends QueryTest with 
SharedSparkSession with AdaptiveSparkPlan
            val selectExpr = "/*+ BROADCAST(left_t) */ k1 as k0"
            val plan = sql(getJoinQuery(selectExpr, 
joinType)).queryExecution.executedPlan
            assert(collect(plan) { case _: BroadcastNestedLoopJoinExec => true 
}.size === 1)
@@ -750,7 +809,7 @@ index 4d256154c85..43f0bebb00c 100644
        }
  
        // Test singe partition
-@@ -1538,7 +1565,8 @@ class JoinSuite extends QueryTest with 
SharedSparkSession with AdaptiveSparkPlan
+@@ -1538,7 +1566,8 @@ class JoinSuite extends QueryTest with 
SharedSparkSession with AdaptiveSparkPlan
             |FROM range(0, 10, 1, 1) t1 FULL OUTER JOIN range(0, 10, 1, 1) t2
             |""".stripMargin)
        val plan = fullJoinDF.queryExecution.executedPlan
@@ -760,7 +819,7 @@ index 4d256154c85..43f0bebb00c 100644
        checkAnswer(fullJoinDF, Row(100))
      }
    }
-@@ -1583,6 +1611,9 @@ class JoinSuite extends QueryTest with 
SharedSparkSession with AdaptiveSparkPlan
+@@ -1583,6 +1612,9 @@ class JoinSuite extends QueryTest with 
SharedSparkSession with AdaptiveSparkPlan
            Seq(semiJoinDF, antiJoinDF).foreach { df =>
              assert(collect(df.queryExecution.executedPlan) {
                case j: ShuffledHashJoinExec if j.ignoreDuplicatedKey == 
ignoreDuplicatedKey => true
@@ -770,7 +829,7 @@ index 4d256154c85..43f0bebb00c 100644
              }.size == 1)
            }
        }
-@@ -1627,14 +1658,20 @@ class JoinSuite extends QueryTest with 
SharedSparkSession with AdaptiveSparkPlan
+@@ -1627,14 +1659,20 @@ class JoinSuite extends QueryTest with 
SharedSparkSession with AdaptiveSparkPlan
  
    test("SPARK-43113: Full outer join with duplicate stream-side references in 
condition (SMJ)") {
      def check(plan: SparkPlan): Unit = {
@@ -793,6 +852,16 @@ index 4d256154c85..43f0bebb00c 100644
      }
      dupStreamSideColTest("SHUFFLE_HASH", check)
    }
+@@ -1770,7 +1808,8 @@ class ThreadLeakInSortMergeJoinSuite
+       sparkConf.set(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD, 20))
+   }
+ 
+-  test("SPARK-47146: thread leak when doing SortMergeJoin (with spill)") {
++  test("SPARK-47146: thread leak when doing SortMergeJoin (with spill)",
++    IgnoreComet("Comet does not support spilling")) {
+ 
+     withSQLConf(
+       SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1") {
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/PlanStabilitySuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/PlanStabilitySuite.scala
 index c26757c9cff..d55775f09d7 100644
 --- a/sql/core/src/test/scala/org/apache/spark/sql/PlanStabilitySuite.scala
@@ -1256,6 +1325,28 @@ index 47679ed7865..9ffbaecb98e 100644
      }.length == hashAggCount)
      assert(collectWithSubqueries(plan) { case s: SortAggregateExec => s 
}.length == sortAggCount)
    }
+diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala
+index eec396b2e39..bf3f1c769d6 100644
+--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala
++++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala
+@@ -18,7 +18,7 @@
+ package org.apache.spark.sql.execution
+ 
+ import org.apache.spark.TestUtils.assertSpilled
+-import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
++import org.apache.spark.sql.{AnalysisException, IgnoreComet, QueryTest, Row}
+ import 
org.apache.spark.sql.internal.SQLConf.{WINDOW_EXEC_BUFFER_IN_MEMORY_THRESHOLD, 
WINDOW_EXEC_BUFFER_SPILL_THRESHOLD}
+ import org.apache.spark.sql.test.SharedSparkSession
+ 
+@@ -470,7 +470,7 @@ class SQLWindowFunctionSuite extends QueryTest with 
SharedSparkSession {
+       Row(1, 3, null) :: Row(2, null, 4) :: Nil)
+   }
+ 
+-  test("test with low buffer spill threshold") {
++  test("test with low buffer spill threshold", IgnoreComet("Comet does not 
support spilling")) {
+     val nums = sparkContext.parallelize(1 to 10).map(x => (x, x % 
2)).toDF("x", "y")
+     nums.createOrReplaceTempView("nums")
+ 
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
 index b14f4a405f6..ab7baf434a5 100644
 --- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
diff --git a/dev/diffs/3.5.6.diff b/dev/diffs/3.5.6.diff
index 42d5dad00..e3ba0a35c 100644
--- a/dev/diffs/3.5.6.diff
+++ b/dev/diffs/3.5.6.diff
@@ -226,7 +226,7 @@ index 9815cb816c9..95b5f9992b0 100644
  
    test("A cached table preserves the partitioning and ordering of its cached 
SparkPlan") {
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
-index 5a8681aed97..db69fde723a 100644
+index 5a8681aed97..da9d25e2eb4 100644
 --- 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
 +++ 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
 @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.plans.logical.Expand
@@ -247,16 +247,6 @@ index 5a8681aed97..db69fde723a 100644
        }
        assert(exchangePlans.length == 1)
      }
-@@ -1255,7 +1255,8 @@ class DataFrameAggregateSuite extends QueryTest
-     }
-   }
- 
--  test("SPARK-32038: NormalizeFloatingNumbers should work on distinct 
aggregate") {
-+  test("SPARK-32038: NormalizeFloatingNumbers should work on distinct 
aggregate",
-+    IgnoreComet("TODO: 
https://github.com/apache/datafusion-comet/issues/1824";)) {
-     withTempView("view") {
-       val nan1 = java.lang.Float.intBitsToFloat(0x7f800001)
-       val nan2 = java.lang.Float.intBitsToFloat(0x7fffffff)
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
 index 56e9520fdab..917932336df 100644
 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
@@ -342,6 +332,44 @@ index 7ee18df3756..64f01a68048 100644
      withTable("tbl") {
        sql(
          """
+diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
+index 47a311c71d5..342e71cfdd4 100644
+--- 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
++++ 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
+@@ -24,8 +24,9 @@ import 
org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression
+ import org.apache.spark.sql.catalyst.optimizer.TransposeWindow
+ import org.apache.spark.sql.catalyst.plans.logical.{Window => LogicalWindow}
+ import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
++import org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec
+ import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+-import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, 
Exchange, ShuffleExchangeExec}
++import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, 
Exchange, ShuffleExchangeExec, ShuffleExchangeLike}
+ import org.apache.spark.sql.execution.window.WindowExec
+ import org.apache.spark.sql.expressions.{Aggregator, 
MutableAggregationBuffer, UserDefinedAggregateFunction, Window}
+ import org.apache.spark.sql.functions._
+@@ -1187,10 +1188,12 @@ class DataFrameWindowFunctionsSuite extends QueryTest
+     }
+ 
+     def isShuffleExecByRequirement(
+-        plan: ShuffleExchangeExec,
++        plan: ShuffleExchangeLike,
+         desiredClusterColumns: Seq[String]): Boolean = plan match {
+       case ShuffleExchangeExec(op: HashPartitioning, _, ENSURE_REQUIREMENTS, 
_) =>
+         partitionExpressionsColumns(op.expressions) === desiredClusterColumns
++      case CometShuffleExchangeExec(op: HashPartitioning, _, _, 
ENSURE_REQUIREMENTS, _, _) =>
++        partitionExpressionsColumns(op.expressions) === desiredClusterColumns
+       case _ => false
+     }
+ 
+@@ -1213,7 +1216,7 @@ class DataFrameWindowFunctionsSuite extends QueryTest
+       val shuffleByRequirement = windowed.queryExecution.executedPlan.exists {
+         case w: WindowExec =>
+           w.child.exists {
+-            case s: ShuffleExchangeExec => isShuffleExecByRequirement(s, 
Seq("key1", "key2"))
++            case s: ShuffleExchangeLike => isShuffleExecByRequirement(s, 
Seq("key1", "key2"))
+             case _ => false
+           }
+         case _ => false
 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
 index f32b32ffc5a..447d7c6416e 100644
 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -365,7 +393,7 @@ index f32b32ffc5a..447d7c6416e 100644
      assert(exchanges.size == 2)
    }
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
-index f33432ddb6f..fe9f74ff8f1 100644
+index f33432ddb6f..0e1499a24ca 100644
 --- 
a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
 +++ 
b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
 @@ -22,6 +22,7 @@ import org.scalatest.GivenWhenThen
@@ -386,7 +414,37 @@ index f33432ddb6f..fe9f74ff8f1 100644
        case _ => Nil
      }
    }
-@@ -1729,6 +1733,8 @@ abstract class DynamicPartitionPruningV1Suite extends 
DynamicPartitionPruningDat
+@@ -1027,7 +1031,8 @@ abstract class DynamicPartitionPruningSuiteBase
+     }
+   }
+ 
+-  test("avoid reordering broadcast join keys to match input hash 
partitioning") {
++  test("avoid reordering broadcast join keys to match input hash 
partitioning",
++    IgnoreComet("TODO: 
https://github.com/apache/datafusion-comet/issues/1839";)) {
+     withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> 
"false",
+       SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
+       withTable("large", "dimTwo", "dimThree") {
+@@ -1215,7 +1220,8 @@ abstract class DynamicPartitionPruningSuiteBase
+   }
+ 
+   test("SPARK-32509: Unused Dynamic Pruning filter shouldn't affect " +
+-    "canonicalization and exchange reuse") {
++    "canonicalization and exchange reuse",
++    IgnoreComet("TODO: 
https://github.com/apache/datafusion-comet/issues/1839";)) {
+     withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> 
"true") {
+       withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
+         val df = sql(
+@@ -1423,7 +1429,8 @@ abstract class DynamicPartitionPruningSuiteBase
+     }
+   }
+ 
+-  test("SPARK-34637: DPP side broadcast query stage is created firstly") {
++  test("SPARK-34637: DPP side broadcast query stage is created firstly",
++    IgnoreComet("TODO: 
https://github.com/apache/datafusion-comet/issues/1839";)) {
+     withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> 
"true") {
+       val df = sql(
+         """ WITH v as (
+@@ -1729,6 +1736,8 @@ abstract class DynamicPartitionPruningV1Suite extends 
DynamicPartitionPruningDat
                case s: BatchScanExec =>
                  // we use f1 col for v2 tables due to schema pruning
                  s.output.exists(_.exists(_.argString(maxFields = 
100).contains("f1")))
@@ -563,7 +621,7 @@ index 7af826583bd..3c3def1eb67 100644
      assert(shuffleMergeJoins.size == 1)
    }
 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
-index 4d256154c85..43f0bebb00c 100644
+index 4d256154c85..66a5473852d 100644
 --- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
 +++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
 @@ -31,7 +31,8 @@ import 
org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
@@ -689,7 +747,7 @@ index 4d256154c85..43f0bebb00c 100644
            }.size === 1)
            // Same result between shuffled hash join and sort merge join
            checkAnswer(shjDF, smjResult)
-@@ -1432,13 +1446,19 @@ class JoinSuite extends QueryTest with 
SharedSparkSession with AdaptiveSparkPlan
+@@ -1432,13 +1446,20 @@ class JoinSuite extends QueryTest with 
SharedSparkSession with AdaptiveSparkPlan
          assert(shjCodegenDF.queryExecution.executedPlan.collect {
            case WholeStageCodegenExec(_ : ShuffledHashJoinExec) => true
            case WholeStageCodegenExec(ProjectExec(_, _ : 
ShuffledHashJoinExec)) => true
@@ -697,6 +755,7 @@ index 4d256154c85..43f0bebb00c 100644
 +            true
 +          case WholeStageCodegenExec(ColumnarToRowExec(
 +            InputAdapter(CometProjectExec(_, _, _, _, _: CometHashJoinExec, 
_)))) => true
++          case _: CometHashJoinExec => true
          }.size === 1)
          checkAnswer(shjCodegenDF, Seq.empty)
  
@@ -710,7 +769,7 @@ index 4d256154c85..43f0bebb00c 100644
            checkAnswer(shjNonCodegenDF, Seq.empty)
          }
        }
-@@ -1486,7 +1506,8 @@ class JoinSuite extends QueryTest with 
SharedSparkSession with AdaptiveSparkPlan
+@@ -1486,7 +1507,8 @@ class JoinSuite extends QueryTest with 
SharedSparkSession with AdaptiveSparkPlan
            val plan = sql(getAggQuery(selectExpr, 
joinType)).queryExecution.executedPlan
            assert(collect(plan) { case _: BroadcastNestedLoopJoinExec => true 
}.size === 1)
            // Have shuffle before aggregation
@@ -720,7 +779,7 @@ index 4d256154c85..43f0bebb00c 100644
        }
  
        def getJoinQuery(selectExpr: String, joinType: String): String = {
-@@ -1515,9 +1536,12 @@ class JoinSuite extends QueryTest with 
SharedSparkSession with AdaptiveSparkPlan
+@@ -1515,9 +1537,12 @@ class JoinSuite extends QueryTest with 
SharedSparkSession with AdaptiveSparkPlan
            }
            val plan = sql(getJoinQuery(selectExpr, 
joinType)).queryExecution.executedPlan
            assert(collect(plan) { case _: BroadcastNestedLoopJoinExec => true 
}.size === 1)
@@ -735,7 +794,7 @@ index 4d256154c85..43f0bebb00c 100644
        }
  
        // Test output ordering is not preserved
-@@ -1526,9 +1550,12 @@ class JoinSuite extends QueryTest with 
SharedSparkSession with AdaptiveSparkPlan
+@@ -1526,9 +1551,12 @@ class JoinSuite extends QueryTest with 
SharedSparkSession with AdaptiveSparkPlan
            val selectExpr = "/*+ BROADCAST(left_t) */ k1 as k0"
            val plan = sql(getJoinQuery(selectExpr, 
joinType)).queryExecution.executedPlan
            assert(collect(plan) { case _: BroadcastNestedLoopJoinExec => true 
}.size === 1)
@@ -750,7 +809,7 @@ index 4d256154c85..43f0bebb00c 100644
        }
  
        // Test singe partition
-@@ -1538,7 +1565,8 @@ class JoinSuite extends QueryTest with 
SharedSparkSession with AdaptiveSparkPlan
+@@ -1538,7 +1566,8 @@ class JoinSuite extends QueryTest with 
SharedSparkSession with AdaptiveSparkPlan
             |FROM range(0, 10, 1, 1) t1 FULL OUTER JOIN range(0, 10, 1, 1) t2
             |""".stripMargin)
        val plan = fullJoinDF.queryExecution.executedPlan
@@ -760,7 +819,7 @@ index 4d256154c85..43f0bebb00c 100644
        checkAnswer(fullJoinDF, Row(100))
      }
    }
-@@ -1583,6 +1611,9 @@ class JoinSuite extends QueryTest with 
SharedSparkSession with AdaptiveSparkPlan
+@@ -1583,6 +1612,9 @@ class JoinSuite extends QueryTest with 
SharedSparkSession with AdaptiveSparkPlan
            Seq(semiJoinDF, antiJoinDF).foreach { df =>
              assert(collect(df.queryExecution.executedPlan) {
                case j: ShuffledHashJoinExec if j.ignoreDuplicatedKey == 
ignoreDuplicatedKey => true
@@ -770,7 +829,7 @@ index 4d256154c85..43f0bebb00c 100644
              }.size == 1)
            }
        }
-@@ -1627,14 +1658,20 @@ class JoinSuite extends QueryTest with 
SharedSparkSession with AdaptiveSparkPlan
+@@ -1627,14 +1659,20 @@ class JoinSuite extends QueryTest with 
SharedSparkSession with AdaptiveSparkPlan
  
    test("SPARK-43113: Full outer join with duplicate stream-side references in 
condition (SMJ)") {
      def check(plan: SparkPlan): Unit = {
@@ -793,6 +852,16 @@ index 4d256154c85..43f0bebb00c 100644
      }
      dupStreamSideColTest("SHUFFLE_HASH", check)
    }
+@@ -1770,7 +1808,8 @@ class ThreadLeakInSortMergeJoinSuite
+       sparkConf.set(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD, 20))
+   }
+ 
+-  test("SPARK-47146: thread leak when doing SortMergeJoin (with spill)") {
++  test("SPARK-47146: thread leak when doing SortMergeJoin (with spill)",
++    IgnoreComet("Comet does not support spilling")) {
+ 
+     withSQLConf(
+       SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1") {
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/PlanStabilitySuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/PlanStabilitySuite.scala
 index c26757c9cff..d55775f09d7 100644
 --- a/sql/core/src/test/scala/org/apache/spark/sql/PlanStabilitySuite.scala
@@ -1256,6 +1325,28 @@ index 47679ed7865..9ffbaecb98e 100644
      }.length == hashAggCount)
      assert(collectWithSubqueries(plan) { case s: SortAggregateExec => s 
}.length == sortAggCount)
    }
+diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala
+index eec396b2e39..bf3f1c769d6 100644
+--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala
++++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala
+@@ -18,7 +18,7 @@
+ package org.apache.spark.sql.execution
+ 
+ import org.apache.spark.TestUtils.assertSpilled
+-import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
++import org.apache.spark.sql.{AnalysisException, IgnoreComet, QueryTest, Row}
+ import 
org.apache.spark.sql.internal.SQLConf.{WINDOW_EXEC_BUFFER_IN_MEMORY_THRESHOLD, 
WINDOW_EXEC_BUFFER_SPILL_THRESHOLD}
+ import org.apache.spark.sql.test.SharedSparkSession
+ 
+@@ -470,7 +470,7 @@ class SQLWindowFunctionSuite extends QueryTest with 
SharedSparkSession {
+       Row(1, 3, null) :: Row(2, null, 4) :: Nil)
+   }
+ 
+-  test("test with low buffer spill threshold") {
++  test("test with low buffer spill threshold", IgnoreComet("Comet does not 
support spilling")) {
+     val nums = sparkContext.parallelize(1 to 10).map(x => (x, x % 
2)).toDF("x", "y")
+     nums.createOrReplaceTempView("nums")
+ 
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
 index b14f4a405f6..ab7baf434a5 100644
 --- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
diff --git a/dev/diffs/4.0.0-preview1.diff b/dev/diffs/4.0.0-preview1.diff
index 2fec4297d..a59c85c21 100644
--- a/dev/diffs/4.0.0-preview1.diff
+++ b/dev/diffs/4.0.0-preview1.diff
@@ -268,7 +268,7 @@ index d023fb82185..0f4f03bda6c 100644
  
        withTempView("t0", "t1", "t2") {
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
-index 620ee430cab..f5df9218fc1 100644
+index 620ee430cab..9d383a4bff9 100644
 --- 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
 +++ 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala
 @@ -28,7 +28,7 @@ import 
org.apache.spark.sql.catalyst.util.AUTO_GENERATED_ALIAS
@@ -289,16 +289,6 @@ index 620ee430cab..f5df9218fc1 100644
        }
        assert(exchangePlans.length == 1)
      }
-@@ -1275,7 +1275,8 @@ class DataFrameAggregateSuite extends QueryTest
-     }
-   }
- 
--  test("SPARK-32038: NormalizeFloatingNumbers should work on distinct 
aggregate") {
-+  test("SPARK-32038: NormalizeFloatingNumbers should work on distinct 
aggregate",
-+    IgnoreComet("TODO: 
https://github.com/apache/datafusion-comet/issues/1824";)) {
-     withTempView("view") {
-       val nan1 = java.lang.Float.intBitsToFloat(0x7f800001)
-       val nan2 = java.lang.Float.intBitsToFloat(0x7fffffff)
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
 index f6fd6b501d7..11870c85d82 100644
 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
@@ -374,6 +364,44 @@ index 760ee802608..b77133ffd37 100644
      }
      assert(exchanges.size == 2)
    }
+diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
+index e3aff9b36ae..06196517935 100644
+--- 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
++++ 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
+@@ -24,8 +24,9 @@ import 
org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression
+ import org.apache.spark.sql.catalyst.optimizer.TransposeWindow
+ import org.apache.spark.sql.catalyst.plans.logical.{Window => LogicalWindow}
+ import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
++import org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec
+ import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+-import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, 
Exchange, ShuffleExchangeExec}
++import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, 
Exchange, ShuffleExchangeExec, ShuffleExchangeLike}
+ import org.apache.spark.sql.execution.window.WindowExec
+ import org.apache.spark.sql.expressions.{Aggregator, 
MutableAggregationBuffer, UserDefinedAggregateFunction, Window}
+ import org.apache.spark.sql.functions._
+@@ -1142,10 +1143,12 @@ class DataFrameWindowFunctionsSuite extends QueryTest
+     }
+ 
+     def isShuffleExecByRequirement(
+-        plan: ShuffleExchangeExec,
++        plan: ShuffleExchangeLike,
+         desiredClusterColumns: Seq[String]): Boolean = plan match {
+       case ShuffleExchangeExec(op: HashPartitioning, _, ENSURE_REQUIREMENTS, 
_) =>
+         partitionExpressionsColumns(op.expressions) === desiredClusterColumns
++      case CometShuffleExchangeExec(op: HashPartitioning, _, _, 
ENSURE_REQUIREMENTS, _, _) =>
++        partitionExpressionsColumns(op.expressions) === desiredClusterColumns
+       case _ => false
+     }
+ 
+@@ -1168,7 +1171,7 @@ class DataFrameWindowFunctionsSuite extends QueryTest
+       val shuffleByRequirement = windowed.queryExecution.executedPlan.exists {
+         case w: WindowExec =>
+           w.child.exists {
+-            case s: ShuffleExchangeExec => isShuffleExecByRequirement(s, 
Seq("key1", "key2"))
++            case s: ShuffleExchangeLike => isShuffleExecByRequirement(s, 
Seq("key1", "key2"))
+             case _ => false
+           }
+         case _ => false
 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
 index 16a493b5290..3f0b70e2d59 100644
 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -397,7 +425,7 @@ index 16a493b5290..3f0b70e2d59 100644
      assert(exchanges.size == 2)
    }
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
-index 2c24cc7d570..3e6a8632fa6 100644
+index 2c24cc7d570..21d36ebc6f5 100644
 --- 
a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
 +++ 
b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
 @@ -22,6 +22,7 @@ import org.scalatest.GivenWhenThen
@@ -428,17 +456,37 @@ index 2c24cc7d570..3e6a8632fa6 100644
      Given("disable broadcast pruning and disable subquery duplication")
      withSQLConf(
        SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true",
-@@ -1215,7 +1220,8 @@ abstract class DynamicPartitionPruningSuiteBase
+@@ -1027,7 +1032,8 @@ abstract class DynamicPartitionPruningSuiteBase
+     }
+   }
+ 
+-  test("avoid reordering broadcast join keys to match input hash 
partitioning") {
++  test("avoid reordering broadcast join keys to match input hash 
partitioning",
++    IgnoreComet("TODO: 
https://github.com/apache/datafusion-comet/issues/1839";)) {
+     withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> 
"false",
+       SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
+       withTable("large", "dimTwo", "dimThree") {
+@@ -1215,7 +1221,8 @@ abstract class DynamicPartitionPruningSuiteBase
    }
  
    test("SPARK-32509: Unused Dynamic Pruning filter shouldn't affect " +
 -    "canonicalization and exchange reuse") {
 +    "canonicalization and exchange reuse",
-+    IgnoreComet("TODO: Support SubqueryBroadcastExec in Comet: #1737")) {
++    IgnoreComet("TODO: 
https://github.com/apache/datafusion-comet/issues/1839";)) {
      withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> 
"true") {
        withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
          val df = sql(
-@@ -1455,7 +1461,8 @@ abstract class DynamicPartitionPruningSuiteBase
+@@ -1424,7 +1431,8 @@ abstract class DynamicPartitionPruningSuiteBase
+     }
+   }
+ 
+-  test("SPARK-34637: DPP side broadcast query stage is created firstly") {
++  test("SPARK-34637: DPP side broadcast query stage is created firstly",
++    IgnoreComet("TODO: 
https://github.com/apache/datafusion-comet/issues/1839";)) {
+     withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> 
"true") {
+       val df = sql(
+         """ WITH v as (
+@@ -1455,7 +1463,8 @@ abstract class DynamicPartitionPruningSuiteBase
      }
    }
  
@@ -448,7 +496,7 @@ index 2c24cc7d570..3e6a8632fa6 100644
      val df = sql(
        """
          |SELECT s.store_id, f.product_id
-@@ -1730,6 +1737,8 @@ abstract class DynamicPartitionPruningV1Suite extends 
DynamicPartitionPruningDat
+@@ -1730,6 +1739,8 @@ abstract class DynamicPartitionPruningV1Suite extends 
DynamicPartitionPruningDat
                case s: BatchScanExec =>
                  // we use f1 col for v2 tables due to schema pruning
                  s.output.exists(_.exists(_.argString(maxFields = 
100).contains("f1")))
@@ -649,7 +697,7 @@ index 53e47f428c3..a55d8f0c161 100644
      assert(shuffleMergeJoins.size == 1)
    }
 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
-index fcb937d82ba..fafe8e8d08b 100644
+index fcb937d82ba..fc208087a69 100644
 --- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
 +++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
 @@ -29,7 +29,8 @@ import 
org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
@@ -775,7 +823,7 @@ index fcb937d82ba..fafe8e8d08b 100644
            }.size === 1)
            // Same result between shuffled hash join and sort merge join
            checkAnswer(shjDF, smjResult)
-@@ -1435,13 +1449,19 @@ class JoinSuite extends QueryTest with 
SharedSparkSession with AdaptiveSparkPlan
+@@ -1435,13 +1449,20 @@ class JoinSuite extends QueryTest with 
SharedSparkSession with AdaptiveSparkPlan
          assert(shjCodegenDF.queryExecution.executedPlan.collect {
            case WholeStageCodegenExec(_ : ShuffledHashJoinExec) => true
            case WholeStageCodegenExec(ProjectExec(_, _ : 
ShuffledHashJoinExec)) => true
@@ -783,6 +831,7 @@ index fcb937d82ba..fafe8e8d08b 100644
 +            true
 +          case WholeStageCodegenExec(ColumnarToRowExec(
 +            InputAdapter(CometProjectExec(_, _, _, _, _: CometHashJoinExec, 
_)))) => true
++          case _: CometHashJoinExec => true
          }.size === 1)
          checkAnswer(shjCodegenDF, Seq.empty)
  
@@ -796,7 +845,7 @@ index fcb937d82ba..fafe8e8d08b 100644
            checkAnswer(shjNonCodegenDF, Seq.empty)
          }
        }
-@@ -1489,7 +1509,8 @@ class JoinSuite extends QueryTest with 
SharedSparkSession with AdaptiveSparkPlan
+@@ -1489,7 +1510,8 @@ class JoinSuite extends QueryTest with 
SharedSparkSession with AdaptiveSparkPlan
            val plan = sql(getAggQuery(selectExpr, 
joinType)).queryExecution.executedPlan
            assert(collect(plan) { case _: BroadcastNestedLoopJoinExec => true 
}.size === 1)
            // Have shuffle before aggregation
@@ -806,7 +855,7 @@ index fcb937d82ba..fafe8e8d08b 100644
        }
  
        def getJoinQuery(selectExpr: String, joinType: String): String = {
-@@ -1518,9 +1539,12 @@ class JoinSuite extends QueryTest with 
SharedSparkSession with AdaptiveSparkPlan
+@@ -1518,9 +1540,12 @@ class JoinSuite extends QueryTest with 
SharedSparkSession with AdaptiveSparkPlan
            }
            val plan = sql(getJoinQuery(selectExpr, 
joinType)).queryExecution.executedPlan
            assert(collect(plan) { case _: BroadcastNestedLoopJoinExec => true 
}.size === 1)
@@ -821,7 +870,7 @@ index fcb937d82ba..fafe8e8d08b 100644
        }
  
        // Test output ordering is not preserved
-@@ -1529,9 +1553,12 @@ class JoinSuite extends QueryTest with 
SharedSparkSession with AdaptiveSparkPlan
+@@ -1529,9 +1554,12 @@ class JoinSuite extends QueryTest with 
SharedSparkSession with AdaptiveSparkPlan
            val selectExpr = "/*+ BROADCAST(left_t) */ k1 as k0"
            val plan = sql(getJoinQuery(selectExpr, 
joinType)).queryExecution.executedPlan
            assert(collect(plan) { case _: BroadcastNestedLoopJoinExec => true 
}.size === 1)
@@ -836,7 +885,7 @@ index fcb937d82ba..fafe8e8d08b 100644
        }
  
        // Test singe partition
-@@ -1541,7 +1568,8 @@ class JoinSuite extends QueryTest with 
SharedSparkSession with AdaptiveSparkPlan
+@@ -1541,7 +1569,8 @@ class JoinSuite extends QueryTest with 
SharedSparkSession with AdaptiveSparkPlan
             |FROM range(0, 10, 1, 1) t1 FULL OUTER JOIN range(0, 10, 1, 1) t2
             |""".stripMargin)
        val plan = fullJoinDF.queryExecution.executedPlan
@@ -846,7 +895,7 @@ index fcb937d82ba..fafe8e8d08b 100644
        checkAnswer(fullJoinDF, Row(100))
      }
    }
-@@ -1586,6 +1614,9 @@ class JoinSuite extends QueryTest with 
SharedSparkSession with AdaptiveSparkPlan
+@@ -1586,6 +1615,9 @@ class JoinSuite extends QueryTest with 
SharedSparkSession with AdaptiveSparkPlan
            Seq(semiJoinDF, antiJoinDF).foreach { df =>
              assert(collect(df.queryExecution.executedPlan) {
                case j: ShuffledHashJoinExec if j.ignoreDuplicatedKey == 
ignoreDuplicatedKey => true
@@ -856,7 +905,7 @@ index fcb937d82ba..fafe8e8d08b 100644
              }.size == 1)
            }
        }
-@@ -1630,14 +1661,20 @@ class JoinSuite extends QueryTest with 
SharedSparkSession with AdaptiveSparkPlan
+@@ -1630,14 +1662,20 @@ class JoinSuite extends QueryTest with 
SharedSparkSession with AdaptiveSparkPlan
  
    test("SPARK-43113: Full outer join with duplicate stream-side references in 
condition (SMJ)") {
      def check(plan: SparkPlan): Unit = {
@@ -879,7 +928,7 @@ index fcb937d82ba..fafe8e8d08b 100644
      }
      dupStreamSideColTest("SHUFFLE_HASH", check)
    }
-@@ -1773,7 +1810,8 @@ class ThreadLeakInSortMergeJoinSuite
+@@ -1773,7 +1811,8 @@ class ThreadLeakInSortMergeJoinSuite
        sparkConf.set(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD, 20))
    }
  
@@ -1451,6 +1500,28 @@ index 47679ed7865..9ffbaecb98e 100644
      }.length == hashAggCount)
      assert(collectWithSubqueries(plan) { case s: SortAggregateExec => s 
}.length == sortAggCount)
    }
+diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala
+index eec396b2e39..bf3f1c769d6 100644
+--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala
++++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLWindowFunctionSuite.scala
+@@ -18,7 +18,7 @@
+ package org.apache.spark.sql.execution
+ 
+ import org.apache.spark.TestUtils.assertSpilled
+-import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
++import org.apache.spark.sql.{AnalysisException, IgnoreComet, QueryTest, Row}
+ import 
org.apache.spark.sql.internal.SQLConf.{WINDOW_EXEC_BUFFER_IN_MEMORY_THRESHOLD, 
WINDOW_EXEC_BUFFER_SPILL_THRESHOLD}
+ import org.apache.spark.sql.test.SharedSparkSession
+ 
+@@ -470,7 +470,7 @@ class SQLWindowFunctionSuite extends QueryTest with 
SharedSparkSession {
+       Row(1, 3, null) :: Row(2, null, 4) :: Nil)
+   }
+ 
+-  test("test with low buffer spill threshold") {
++  test("test with low buffer spill threshold", IgnoreComet("Comet does not 
support spilling")) {
+     val nums = sparkContext.parallelize(1 to 10).map(x => (x, x % 
2)).toDF("x", "y")
+     nums.createOrReplaceTempView("nums")
+ 
 diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
 index 966f4e74712..8017e22d7f8 100644
 --- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala
diff --git a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala 
b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala
index c9dd4f17b..54e6e6364 100644
--- a/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala
+++ b/spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala
@@ -35,7 +35,7 @@ import org.apache.spark.sql.execution.window.WindowExec
 import org.apache.spark.sql.types.{DoubleType, FloatType}
 
 import org.apache.comet.{CometConf, ExtendedExplainInfo}
-import org.apache.comet.CometConf.{COMET_ANSI_MODE_ENABLED, 
COMET_SHUFFLE_FALLBACK_TO_COLUMNAR}
+import org.apache.comet.CometConf.COMET_ANSI_MODE_ENABLED
 import org.apache.comet.CometSparkSessionExtensions.{createMessage, 
getCometBroadcastNotEnabledReason, getCometShuffleNotEnabledReason, 
isANSIEnabled, isCometBroadCastForceEnabled, isCometExecEnabled, 
isCometJVMShuffleMode, isCometLoaded, isCometNativeShuffleMode, isCometScan, 
isCometShuffleEnabled, isSpark40Plus, shouldApplySparkToColumnar, withInfo}
 import org.apache.comet.serde.OperatorOuterClass.Operator
 import org.apache.comet.serde.QueryPlanSerde
@@ -491,16 +491,9 @@ case class CometExecRule(session: SparkSession) extends 
Rule[SparkPlan] {
             None
           }
 
-        // this is a temporary workaround because some Spark SQL tests fail
-        // when we enable COMET_SHUFFLE_FALLBACK_TO_COLUMNAR due to valid bugs
-        // that we had not previously seen
-        val tryColumnarNext =
-          !nativePrecondition || (nativePrecondition && nativeShuffle.isEmpty 
&&
-            COMET_SHUFFLE_FALLBACK_TO_COLUMNAR.get(conf))
-
         val nativeOrColumnarShuffle = if (nativeShuffle.isDefined) {
           nativeShuffle
-        } else if (tryColumnarNext) {
+        } else {
           // Columnar shuffle for regular Spark operators (not Comet) and 
Comet operators
           // (if configured).
           // If the child of ShuffleExchangeExec is also a 
ShuffleExchangeExec, we should not
@@ -526,8 +519,6 @@ case class CometExecRule(session: SparkSession) extends 
Rule[SparkPlan] {
           } else {
             None
           }
-        } else {
-          None
         }
 
         if (nativeOrColumnarShuffle.isDefined) {
diff --git 
a/spark/src/main/scala/org/apache/comet/rules/EliminateRedundantTransitions.scala
 
b/spark/src/main/scala/org/apache/comet/rules/EliminateRedundantTransitions.scala
index ecc0823d6..a1a96d321 100644
--- 
a/spark/src/main/scala/org/apache/comet/rules/EliminateRedundantTransitions.scala
+++ 
b/spark/src/main/scala/org/apache/comet/rules/EliminateRedundantTransitions.scala
@@ -25,6 +25,7 @@ import org.apache.spark.sql.comet.{CometCollectLimitExec, 
CometColumnarToRowExec
 import org.apache.spark.sql.comet.execution.shuffle.{CometColumnarShuffle, 
CometShuffleExchangeExec}
 import org.apache.spark.sql.execution.{ColumnarToRowExec, RowToColumnarExec, 
SparkPlan}
 import org.apache.spark.sql.execution.adaptive.QueryStageExec
+import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
 
 import org.apache.comet.CometConf
 
@@ -56,7 +57,8 @@ case class EliminateRedundantTransitions(session: 
SparkSession) extends Rule[Spa
   override def apply(plan: SparkPlan): SparkPlan = {
     val newPlan = _apply(plan)
     if (showTransformations) {
-      logInfo(s"\nINPUT: $plan\nOUTPUT: $newPlan")
+      // scalastyle:off println
+      System.err.println(s"EliminateRedundantTransitions:\nINPUT: 
$plan\nOUTPUT: $newPlan")
     }
     newPlan
   }
@@ -64,7 +66,7 @@ case class EliminateRedundantTransitions(session: 
SparkSession) extends Rule[Spa
   private def _apply(plan: SparkPlan): SparkPlan = {
     val eliminatedPlan = plan transformUp {
       case ColumnarToRowExec(shuffleExchangeExec: CometShuffleExchangeExec)
-          if (plan.conf.adaptiveExecutionEnabled) =>
+          if plan.conf.adaptiveExecutionEnabled =>
         shuffleExchangeExec
       case ColumnarToRowExec(sparkToColumnar: CometSparkToColumnarExec) =>
         if (sparkToColumnar.child.supportsColumnar) {
@@ -112,6 +114,7 @@ case class EliminateRedundantTransitions(session: 
SparkSession) extends Rule[Spa
   private def hasCometNativeChild(op: SparkPlan): Boolean = {
     op match {
       case c: QueryStageExec => hasCometNativeChild(c.plan)
+      case c: ReusedExchangeExec => hasCometNativeChild(c.child)
       case _ => op.exists(_.isInstanceOf[CometPlan])
     }
   }
diff --git 
a/spark/src/main/scala/org/apache/spark/sql/comet/CometColumnarToRowExec.scala 
b/spark/src/main/scala/org/apache/spark/sql/comet/CometColumnarToRowExec.scala
index 0391a1c3b..95e03ca69 100644
--- 
a/spark/src/main/scala/org/apache/spark/sql/comet/CometColumnarToRowExec.scala
+++ 
b/spark/src/main/scala/org/apache/spark/sql/comet/CometColumnarToRowExec.scala
@@ -37,6 +37,7 @@ import org.apache.spark.sql.comet.util.{Utils => CometUtils}
 import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.execution.{CodegenSupport, 
ColumnarToRowTransition, SparkPlan, SQLExecution}
 import org.apache.spark.sql.execution.adaptive.BroadcastQueryStageExec
+import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
 import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
 import org.apache.spark.sql.execution.vectorized.{ConstantColumnVector, 
WritableColumnVector}
 import org.apache.spark.sql.types._
@@ -172,6 +173,7 @@ case class CometColumnarToRowExec(child: SparkPlan)
     op match {
       case b: CometBroadcastExchangeExec => Some(b)
       case b: BroadcastQueryStageExec => findCometBroadcastExchange(b.plan)
+      case b: ReusedExchangeExec => findCometBroadcastExchange(b.child)
       case _ => 
op.children.collectFirst(Function.unlift(findCometBroadcastExchange))
     }
   }
diff --git 
a/spark/src/test/scala/org/apache/spark/sql/CometTPCDSQuerySuite.scala 
b/spark/src/test/scala/org/apache/spark/sql/CometTPCDSQuerySuite.scala
index 8d084fd75..0e582ddab 100644
--- a/spark/src/test/scala/org/apache/spark/sql/CometTPCDSQuerySuite.scala
+++ b/spark/src/test/scala/org/apache/spark/sql/CometTPCDSQuerySuite.scala
@@ -188,6 +188,7 @@ class CometTPCDSQuerySuite
     conf.set(CometConf.COMET_NATIVE_SCAN_ENABLED.key, "true")
     conf.set(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key, "true")
     conf.set(CometConf.COMET_MEMORY_OVERHEAD.key, "15g")
+    conf.set(CometConf.COMET_EXPLAIN_TRANSFORMATIONS.key, "true")
     conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
     conf.set(MEMORY_OFFHEAP_ENABLED.key, "true")
     conf.set(MEMORY_OFFHEAP_SIZE.key, "15g")
diff --git a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala 
b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala
index 0b15def98..2a7983f0b 100644
--- a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala
+++ b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala
@@ -78,7 +78,6 @@ abstract class CometTestBase
     conf.set(CometConf.COMET_ENABLED.key, "true")
     conf.set(CometConf.COMET_EXEC_ENABLED.key, "true")
     conf.set(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key, "true")
-    conf.set(CometConf.COMET_SHUFFLE_FALLBACK_TO_COLUMNAR.key, "true")
     conf.set(CometConf.COMET_SPARK_TO_ARROW_ENABLED.key, "true")
     conf.set(CometConf.COMET_NATIVE_SCAN_ENABLED.key, "true")
     conf.set(CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key, "true")
diff --git 
a/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala 
b/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala
index 9f611612f..cf887a101 100644
--- 
a/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala
+++ 
b/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala
@@ -272,7 +272,6 @@ trait CometPlanStabilitySuite extends 
DisableAdaptiveExecutionSuite with TPCDSBa
       CometConf.COMET_DPP_FALLBACK_ENABLED.key -> "false",
       SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> dppEnabled.toString,
       CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
-      CometConf.COMET_SHUFFLE_FALLBACK_TO_COLUMNAR.key -> "true",
       CometConf.COMET_EXEC_SORT_MERGE_JOIN_WITH_JOIN_FILTER_ENABLED.key -> 
"true",
       CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true", // needed for 
v1.4/q9, v1.4/q44, v2.7.0/q6, v2.7.0/q64
       SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "10MB") {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to