This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new f55f882  [SPARK-36079][SQL] Null-based filter estimate should always 
be in the range [0, 1]
f55f882 is described below

commit f55f8820fcbc10ed514d3afafe587b7bb68d8d5f
Author: Karen Feng <karen.f...@databricks.com>
AuthorDate: Tue Jul 20 21:32:13 2021 +0800

    [SPARK-36079][SQL] Null-based filter estimate should always be in the range 
[0, 1]
    
    ### What changes were proposed in this pull request?
    
    Forces the selectivity estimate for null-based filters to be in the range 
`[0,1]`.
    
    ### Why are the changes needed?
    
    I noticed in a few TPC-DS query tests that the column statistic null count 
can be higher than the table statistic row count. In the current 
implementation, the selectivity estimate for `IsNotNull` is negative.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Unit test
    
    Closes #33286 from karenfeng/bound-selectivity-est.
    
    Authored-by: Karen Feng <karen.f...@databricks.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
    (cherry picked from commit ddc61e62b9af5deff1b93e22f466f2a13f281155)
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../sql/catalyst/plans/logical/Statistics.scala    |  13 +
 .../logical/statsEstimation/EstimationUtils.scala  |  18 +-
 .../logical/statsEstimation/FilterEstimation.scala |  30 +--
 .../logical/statsEstimation/JoinEstimation.scala   |  13 +-
 .../statsEstimation/FilterEstimationSuite.scala    |  40 ++-
 .../approved-plans-modified/q19.sf100/explain.txt  | 254 +++++++++----------
 .../q19.sf100/simplified.txt                       | 104 ++++----
 .../approved-plans-modified/q68.sf100/explain.txt  | 207 ++++++++--------
 .../q68.sf100/simplified.txt                       | 141 ++++++-----
 .../approved-plans-modified/q73.sf100/explain.txt  | 272 +++++++++++----------
 .../q73.sf100/simplified.txt                       |  56 +++--
 11 files changed, 591 insertions(+), 557 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
index 7db3ee5..6f3ec3b 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
@@ -24,6 +24,7 @@ import net.jpountz.lz4.{LZ4BlockInputStream, 
LZ4BlockOutputStream}
 
 import org.apache.spark.sql.catalyst.catalog.CatalogColumnStat
 import org.apache.spark.sql.catalyst.expressions._
+import 
org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils
 import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
 
@@ -119,6 +120,18 @@ case class ColumnStat(
       maxLen = maxLen,
       histogram = histogram,
       version = version)
+
+  def updateCountStats(
+      oldNumRows: BigInt,
+      newNumRows: BigInt,
+      updatedColumnStatOpt: Option[ColumnStat] = None): ColumnStat = {
+    val updatedColumnStat = updatedColumnStatOpt.getOrElse(this)
+    val newDistinctCount = EstimationUtils.updateStat(oldNumRows, newNumRows,
+      distinctCount, updatedColumnStat.distinctCount)
+    val newNullCount = EstimationUtils.updateStat(oldNumRows, newNumRows,
+      nullCount, updatedColumnStat.nullCount)
+    updatedColumnStat.copy(distinctCount = newDistinctCount, nullCount = 
newNullCount)
+  }
 }
 
 /**
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala
index 967cced..dafb979 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala
@@ -52,14 +52,20 @@ object EstimationUtils {
   }
 
   /**
-   * Updates (scales down) the number of distinct values if the number of rows 
decreases after
-   * some operation (such as filter, join). Otherwise keep it unchanged.
+   * Updates (scales down) a statistic (eg. number of distinct values) if the 
number of rows
+   * decreases after some operation (such as filter, join). Otherwise keep it 
unchanged.
    */
-  def updateNdv(oldNumRows: BigInt, newNumRows: BigInt, oldNdv: BigInt): 
BigInt = {
-    if (newNumRows < oldNumRows) {
-      ceil(BigDecimal(oldNdv) * BigDecimal(newNumRows) / 
BigDecimal(oldNumRows))
+  def updateStat(
+      oldNumRows: BigInt,
+      newNumRows: BigInt,
+      oldStatOpt: Option[BigInt],
+      updatedStatOpt: Option[BigInt]): Option[BigInt] = {
+    if (oldStatOpt.isDefined && updatedStatOpt.isDefined && updatedStatOpt.get 
> 1 &&
+      newNumRows < oldNumRows) {
+        // no need to scale down since it is already down to 1
+        Some(ceil(BigDecimal(oldStatOpt.get) * BigDecimal(newNumRows) / 
BigDecimal(oldNumRows)))
     } else {
-      oldNdv
+      updatedStatOpt
     }
   }
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
index f7453e2..840a475 100755
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
@@ -106,7 +106,7 @@ case class FilterEstimation(plan: Filter) extends Logging {
       // The foldable Not has been processed in the ConstantFolding rule
       // This is a top-down traversal. The Not could be pushed down by the 
above two cases.
       case Not(l @ Literal(null, _)) =>
-        calculateSingleCondition(l, update = false)
+        calculateSingleCondition(l, update = false).map(boundProbability(_))
 
       case Not(cond) =>
         calculateFilterSelectivity(cond, update = false) match {
@@ -115,7 +115,7 @@ case class FilterEstimation(plan: Filter) extends Logging {
         }
 
       case _ =>
-        calculateSingleCondition(condition, update)
+        calculateSingleCondition(condition, update).map(boundProbability(_))
     }
   }
 
@@ -233,6 +233,8 @@ case class FilterEstimation(plan: Filter) extends Logging {
     val rowCountValue = childStats.rowCount.get
     val nullPercent: Double = if (rowCountValue == 0) {
       0
+    } else if (colStat.nullCount.get > rowCountValue) {
+      1
     } else {
       (BigDecimal(colStat.nullCount.get) / BigDecimal(rowCountValue)).toDouble
     }
@@ -854,6 +856,10 @@ case class FilterEstimation(plan: Filter) extends Logging {
     Some(percent)
   }
 
+  // Bound result in [0, 1]
+  private def boundProbability(p: Double): Double = {
+    Math.max(0.0, Math.min(1.0, p))
+  }
 }
 
 /**
@@ -907,26 +913,14 @@ case class ColumnStatsMap(originalMap: 
AttributeMap[ColumnStat]) {
   def update(a: Attribute, stats: ColumnStat): Unit = 
updatedMap.update(a.exprId, a -> stats)
 
   /**
-   * Collects updated column stats, and scales down ndv for other column stats 
if the number of rows
-   * decreases after this Filter operator.
+   * Collects updated column stats; scales down column count stats if the
+   * number of rows decreases after this Filter operator.
    */
   def outputColumnStats(rowsBeforeFilter: BigInt, rowsAfterFilter: BigInt)
     : AttributeMap[ColumnStat] = {
     val newColumnStats = originalMap.map { case (attr, oriColStat) =>
-      val colStat = updatedMap.get(attr.exprId).map(_._2).getOrElse(oriColStat)
-      val newNdv = if (colStat.distinctCount.isEmpty) {
-        // No NDV in the original stats.
-        None
-      } else if (colStat.distinctCount.get > 1) {
-        // Update ndv based on the overall filter selectivity: scale down ndv 
if the number of rows
-        // decreases; otherwise keep it unchanged.
-        Some(EstimationUtils.updateNdv(oldNumRows = rowsBeforeFilter,
-          newNumRows = rowsAfterFilter, oldNdv = oriColStat.distinctCount.get))
-      } else {
-        // no need to scale down since it is already down to 1 (for skewed 
distribution case)
-        colStat.distinctCount
-      }
-      attr -> colStat.copy(distinctCount = newNdv)
+      attr -> oriColStat.updateCountStats(
+        rowsBeforeFilter, rowsAfterFilter, 
updatedMap.get(attr.exprId).map(_._2))
     }
     AttributeMap(newColumnStats.toSeq)
   }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala
index 777a4c8..c966117 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala
@@ -308,17 +308,12 @@ case class JoinEstimation(join: Join) extends Logging {
         outputAttrStats += a -> keyStatsAfterJoin(a)
       } else {
         val oldColStat = oldAttrStats(a)
-        val oldNdv = oldColStat.distinctCount
-        val newNdv = if (oldNdv.isDefined) {
-          Some(if (join.left.outputSet.contains(a)) {
-            updateNdv(oldNumRows = leftRows, newNumRows = outputRows, oldNdv = 
oldNdv.get)
-          } else {
-            updateNdv(oldNumRows = rightRows, newNumRows = outputRows, oldNdv 
= oldNdv.get)
-          })
+        val oldNumRows = if (join.left.outputSet.contains(a)) {
+          leftRows
         } else {
-          None
+          rightRows
         }
-        val newColStat = oldColStat.copy(distinctCount = newNdv)
+        val newColStat = oldColStat.updateCountStats(oldNumRows, outputRows)
         // TODO: support nullCount updates for specific outer joins
         outputAttrStats += a -> newColStat
       }
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala
index 878fae4..2ec2475 100755
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala
@@ -822,6 +822,41 @@ class FilterEstimationSuite extends 
StatsEstimationTestBase {
       expectedRowCount = 3)
   }
 
+  test("SPARK-36079: Null count should be no higher than row count after 
filter") {
+    val colStatNullableString = colStatString.copy(nullCount = Some(10))
+    val condition = Filter(EqualTo(attrBool, Literal(true)),
+      childStatsTestPlan(Seq(attrBool, attrString), tableRowCount = 10L,
+        attributeMap = AttributeMap(Seq(
+          attrBool -> colStatBool, attrString -> colStatNullableString))))
+    validateEstimatedStats(
+      condition,
+      Seq(attrBool -> colStatBool.copy(distinctCount = Some(1), min = 
Some(true)),
+        attrString -> colStatNullableString.copy(distinctCount = Some(5), 
nullCount = Some(5))),
+      expectedRowCount = 5)
+  }
+
+  test("SPARK-36079: Null count higher than row count") {
+    val colStatNullableString = colStatString.copy(nullCount = Some(15))
+    val condition = Filter(IsNotNull(attrString),
+      childStatsTestPlan(Seq(attrString), tableRowCount = 10L,
+        attributeMap = AttributeMap(Seq(attrString -> colStatNullableString))))
+    validateEstimatedStats(
+      condition,
+      Seq(attrString -> colStatNullableString),
+      expectedRowCount = 0)
+  }
+
+  test("SPARK-36079: Bound selectivity >= 0") {
+    val colStatNullableString = colStatString.copy(nullCount = Some(-1))
+    val condition = Filter(IsNotNull(attrString),
+      childStatsTestPlan(Seq(attrString), tableRowCount = 10L,
+        attributeMap = AttributeMap(Seq(attrString -> colStatNullableString))))
+    validateEstimatedStats(
+      condition,
+      Seq(attrString -> colStatString),
+      expectedRowCount = 10)
+  }
+
   test("ColumnStatsMap tests") {
     val attrNoDistinct = AttributeReference("att_without_distinct", 
IntegerType)()
     val attrNoCount = AttributeReference("att_without_count", BooleanType)()
@@ -848,7 +883,10 @@ class FilterEstimationSuite extends 
StatsEstimationTestBase {
     assert(!columnStatsMap.hasMinMaxStats(attrNoMinMax))
   }
 
-  private def childStatsTestPlan(outList: Seq[Attribute], tableRowCount: 
BigInt): StatsTestPlan = {
+  private def childStatsTestPlan(
+      outList: Seq[Attribute],
+      tableRowCount: BigInt,
+      attributeMap: AttributeMap[ColumnStat] = attributeMap): StatsTestPlan = {
     StatsTestPlan(
       outputList = outList,
       rowCount = tableRowCount,
diff --git 
a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-modified/q19.sf100/explain.txt
 
b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-modified/q19.sf100/explain.txt
index 858cbaa..6b4fce4 100644
--- 
a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-modified/q19.sf100/explain.txt
+++ 
b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-modified/q19.sf100/explain.txt
@@ -1,49 +1,43 @@
 == Physical Plan ==
-TakeOrderedAndProject (45)
-+- * HashAggregate (44)
-   +- Exchange (43)
-      +- * HashAggregate (42)
-         +- * Project (41)
-            +- * BroadcastHashJoin Inner BuildRight (40)
-               :- * Project (34)
-               :  +- * SortMergeJoin Inner (33)
-               :     :- * Sort (27)
-               :     :  +- Exchange (26)
-               :     :     +- * Project (25)
-               :     :        +- * BroadcastHashJoin Inner BuildRight (24)
-               :     :           :- * Project (19)
-               :     :           :  +- * SortMergeJoin Inner (18)
-               :     :           :     :- * Sort (12)
-               :     :           :     :  +- Exchange (11)
-               :     :           :     :     +- * Project (10)
-               :     :           :     :        +- * BroadcastHashJoin Inner 
BuildLeft (9)
-               :     :           :     :           :- BroadcastExchange (5)
-               :     :           :     :           :  +- * Project (4)
-               :     :           :     :           :     +- * Filter (3)
-               :     :           :     :           :        +- * ColumnarToRow 
(2)
-               :     :           :     :           :           +- Scan parquet 
default.date_dim (1)
-               :     :           :     :           +- * Filter (8)
-               :     :           :     :              +- * ColumnarToRow (7)
-               :     :           :     :                 +- Scan parquet 
default.store_sales (6)
-               :     :           :     +- * Sort (17)
-               :     :           :        +- Exchange (16)
-               :     :           :           +- * Filter (15)
-               :     :           :              +- * ColumnarToRow (14)
-               :     :           :                 +- Scan parquet 
default.customer (13)
-               :     :           +- BroadcastExchange (23)
-               :     :              +- * Filter (22)
-               :     :                 +- * ColumnarToRow (21)
-               :     :                    +- Scan parquet default.store (20)
-               :     +- * Sort (32)
-               :        +- Exchange (31)
-               :           +- * Filter (30)
-               :              +- * ColumnarToRow (29)
-               :                 +- Scan parquet default.customer_address (28)
-               +- BroadcastExchange (39)
-                  +- * Project (38)
-                     +- * Filter (37)
-                        +- * ColumnarToRow (36)
-                           +- Scan parquet default.item (35)
+TakeOrderedAndProject (39)
++- * HashAggregate (38)
+   +- Exchange (37)
+      +- * HashAggregate (36)
+         +- * Project (35)
+            +- * BroadcastHashJoin Inner BuildRight (34)
+               :- * Project (28)
+               :  +- * BroadcastHashJoin Inner BuildLeft (27)
+               :     :- BroadcastExchange (23)
+               :     :  +- * Project (22)
+               :     :     +- * BroadcastHashJoin Inner BuildRight (21)
+               :     :        :- * Project (16)
+               :     :        :  +- * BroadcastHashJoin Inner BuildLeft (15)
+               :     :        :     :- BroadcastExchange (11)
+               :     :        :     :  +- * Project (10)
+               :     :        :     :     +- * BroadcastHashJoin Inner 
BuildLeft (9)
+               :     :        :     :        :- BroadcastExchange (5)
+               :     :        :     :        :  +- * Project (4)
+               :     :        :     :        :     +- * Filter (3)
+               :     :        :     :        :        +- * ColumnarToRow (2)
+               :     :        :     :        :           +- Scan parquet 
default.date_dim (1)
+               :     :        :     :        +- * Filter (8)
+               :     :        :     :           +- * ColumnarToRow (7)
+               :     :        :     :              +- Scan parquet 
default.store_sales (6)
+               :     :        :     +- * Filter (14)
+               :     :        :        +- * ColumnarToRow (13)
+               :     :        :           +- Scan parquet default.customer (12)
+               :     :        +- BroadcastExchange (20)
+               :     :           +- * Filter (19)
+               :     :              +- * ColumnarToRow (18)
+               :     :                 +- Scan parquet default.store (17)
+               :     +- * Filter (26)
+               :        +- * ColumnarToRow (25)
+               :           +- Scan parquet default.customer_address (24)
+               +- BroadcastExchange (33)
+                  +- * Project (32)
+                     +- * Filter (31)
+                        +- * ColumnarToRow (30)
+                           +- Scan parquet default.item (29)
 
 
 (1) Scan parquet default.date_dim
@@ -92,171 +86,147 @@ Join condition: None
 Output [4]: [ss_item_sk#5, ss_customer_sk#6, ss_store_sk#7, 
ss_ext_sales_price#8]
 Input [6]: [d_date_sk#1, ss_item_sk#5, ss_customer_sk#6, ss_store_sk#7, 
ss_ext_sales_price#8, ss_sold_date_sk#9]
 
-(11) Exchange
+(11) BroadcastExchange
 Input [4]: [ss_item_sk#5, ss_customer_sk#6, ss_store_sk#7, 
ss_ext_sales_price#8]
-Arguments: hashpartitioning(ss_customer_sk#6, 5), ENSURE_REQUIREMENTS, [id=#11]
+Arguments: HashedRelationBroadcastMode(List(cast(input[1, int, true] as 
bigint)),false), [id=#11]
 
-(12) Sort [codegen id : 3]
-Input [4]: [ss_item_sk#5, ss_customer_sk#6, ss_store_sk#7, 
ss_ext_sales_price#8]
-Arguments: [ss_customer_sk#6 ASC NULLS FIRST], false, 0
-
-(13) Scan parquet default.customer
+(12) Scan parquet default.customer
 Output [2]: [c_customer_sk#12, c_current_addr_sk#13]
 Batched: true
 Location [not included in comparison]/{warehouse_dir}/customer]
 PushedFilters: [IsNotNull(c_customer_sk), IsNotNull(c_current_addr_sk)]
 ReadSchema: struct<c_customer_sk:int,c_current_addr_sk:int>
 
-(14) ColumnarToRow [codegen id : 4]
+(13) ColumnarToRow
 Input [2]: [c_customer_sk#12, c_current_addr_sk#13]
 
-(15) Filter [codegen id : 4]
+(14) Filter
 Input [2]: [c_customer_sk#12, c_current_addr_sk#13]
 Condition : (isnotnull(c_customer_sk#12) AND isnotnull(c_current_addr_sk#13))
 
-(16) Exchange
-Input [2]: [c_customer_sk#12, c_current_addr_sk#13]
-Arguments: hashpartitioning(c_customer_sk#12, 5), ENSURE_REQUIREMENTS, [id=#14]
-
-(17) Sort [codegen id : 5]
-Input [2]: [c_customer_sk#12, c_current_addr_sk#13]
-Arguments: [c_customer_sk#12 ASC NULLS FIRST], false, 0
-
-(18) SortMergeJoin [codegen id : 7]
+(15) BroadcastHashJoin [codegen id : 4]
 Left keys [1]: [ss_customer_sk#6]
 Right keys [1]: [c_customer_sk#12]
 Join condition: None
 
-(19) Project [codegen id : 7]
+(16) Project [codegen id : 4]
 Output [4]: [ss_item_sk#5, ss_store_sk#7, ss_ext_sales_price#8, 
c_current_addr_sk#13]
 Input [6]: [ss_item_sk#5, ss_customer_sk#6, ss_store_sk#7, 
ss_ext_sales_price#8, c_customer_sk#12, c_current_addr_sk#13]
 
-(20) Scan parquet default.store
-Output [2]: [s_store_sk#15, s_zip#16]
+(17) Scan parquet default.store
+Output [2]: [s_store_sk#14, s_zip#15]
 Batched: true
 Location [not included in comparison]/{warehouse_dir}/store]
 PushedFilters: [IsNotNull(s_zip), IsNotNull(s_store_sk)]
 ReadSchema: struct<s_store_sk:int,s_zip:string>
 
-(21) ColumnarToRow [codegen id : 6]
-Input [2]: [s_store_sk#15, s_zip#16]
+(18) ColumnarToRow [codegen id : 3]
+Input [2]: [s_store_sk#14, s_zip#15]
 
-(22) Filter [codegen id : 6]
-Input [2]: [s_store_sk#15, s_zip#16]
-Condition : (isnotnull(s_zip#16) AND isnotnull(s_store_sk#15))
+(19) Filter [codegen id : 3]
+Input [2]: [s_store_sk#14, s_zip#15]
+Condition : (isnotnull(s_zip#15) AND isnotnull(s_store_sk#14))
 
-(23) BroadcastExchange
-Input [2]: [s_store_sk#15, s_zip#16]
-Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as 
bigint)),false), [id=#17]
+(20) BroadcastExchange
+Input [2]: [s_store_sk#14, s_zip#15]
+Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as 
bigint)),false), [id=#16]
 
-(24) BroadcastHashJoin [codegen id : 7]
+(21) BroadcastHashJoin [codegen id : 4]
 Left keys [1]: [ss_store_sk#7]
-Right keys [1]: [s_store_sk#15]
+Right keys [1]: [s_store_sk#14]
 Join condition: None
 
-(25) Project [codegen id : 7]
-Output [4]: [ss_item_sk#5, ss_ext_sales_price#8, c_current_addr_sk#13, 
s_zip#16]
-Input [6]: [ss_item_sk#5, ss_store_sk#7, ss_ext_sales_price#8, 
c_current_addr_sk#13, s_store_sk#15, s_zip#16]
-
-(26) Exchange
-Input [4]: [ss_item_sk#5, ss_ext_sales_price#8, c_current_addr_sk#13, s_zip#16]
-Arguments: hashpartitioning(c_current_addr_sk#13, 5), ENSURE_REQUIREMENTS, 
[id=#18]
+(22) Project [codegen id : 4]
+Output [4]: [ss_item_sk#5, ss_ext_sales_price#8, c_current_addr_sk#13, 
s_zip#15]
+Input [6]: [ss_item_sk#5, ss_store_sk#7, ss_ext_sales_price#8, 
c_current_addr_sk#13, s_store_sk#14, s_zip#15]
 
-(27) Sort [codegen id : 8]
-Input [4]: [ss_item_sk#5, ss_ext_sales_price#8, c_current_addr_sk#13, s_zip#16]
-Arguments: [c_current_addr_sk#13 ASC NULLS FIRST], false, 0
+(23) BroadcastExchange
+Input [4]: [ss_item_sk#5, ss_ext_sales_price#8, c_current_addr_sk#13, s_zip#15]
+Arguments: HashedRelationBroadcastMode(List(cast(input[2, int, true] as 
bigint)),false), [id=#17]
 
-(28) Scan parquet default.customer_address
-Output [2]: [ca_address_sk#19, ca_zip#20]
+(24) Scan parquet default.customer_address
+Output [2]: [ca_address_sk#18, ca_zip#19]
 Batched: true
 Location [not included in comparison]/{warehouse_dir}/customer_address]
 PushedFilters: [IsNotNull(ca_address_sk), IsNotNull(ca_zip)]
 ReadSchema: struct<ca_address_sk:int,ca_zip:string>
 
-(29) ColumnarToRow [codegen id : 9]
-Input [2]: [ca_address_sk#19, ca_zip#20]
-
-(30) Filter [codegen id : 9]
-Input [2]: [ca_address_sk#19, ca_zip#20]
-Condition : (isnotnull(ca_address_sk#19) AND isnotnull(ca_zip#20))
-
-(31) Exchange
-Input [2]: [ca_address_sk#19, ca_zip#20]
-Arguments: hashpartitioning(ca_address_sk#19, 5), ENSURE_REQUIREMENTS, [id=#21]
+(25) ColumnarToRow
+Input [2]: [ca_address_sk#18, ca_zip#19]
 
-(32) Sort [codegen id : 10]
-Input [2]: [ca_address_sk#19, ca_zip#20]
-Arguments: [ca_address_sk#19 ASC NULLS FIRST], false, 0
+(26) Filter
+Input [2]: [ca_address_sk#18, ca_zip#19]
+Condition : (isnotnull(ca_address_sk#18) AND isnotnull(ca_zip#19))
 
-(33) SortMergeJoin [codegen id : 12]
+(27) BroadcastHashJoin [codegen id : 6]
 Left keys [1]: [c_current_addr_sk#13]
-Right keys [1]: [ca_address_sk#19]
-Join condition: NOT (substr(ca_zip#20, 1, 5) = substr(s_zip#16, 1, 5))
+Right keys [1]: [ca_address_sk#18]
+Join condition: NOT (substr(ca_zip#19, 1, 5) = substr(s_zip#15, 1, 5))
 
-(34) Project [codegen id : 12]
+(28) Project [codegen id : 6]
 Output [2]: [ss_item_sk#5, ss_ext_sales_price#8]
-Input [6]: [ss_item_sk#5, ss_ext_sales_price#8, c_current_addr_sk#13, 
s_zip#16, ca_address_sk#19, ca_zip#20]
+Input [6]: [ss_item_sk#5, ss_ext_sales_price#8, c_current_addr_sk#13, 
s_zip#15, ca_address_sk#18, ca_zip#19]
 
-(35) Scan parquet default.item
-Output [6]: [i_item_sk#22, i_brand_id#23, i_brand#24, i_manufact_id#25, 
i_manufact#26, i_manager_id#27]
+(29) Scan parquet default.item
+Output [6]: [i_item_sk#20, i_brand_id#21, i_brand#22, i_manufact_id#23, 
i_manufact#24, i_manager_id#25]
 Batched: true
 Location [not included in comparison]/{warehouse_dir}/item]
 PushedFilters: [IsNotNull(i_manager_id), EqualTo(i_manager_id,7), 
IsNotNull(i_item_sk)]
 ReadSchema: 
struct<i_item_sk:int,i_brand_id:int,i_brand:string,i_manufact_id:int,i_manufact:string,i_manager_id:int>
 
-(36) ColumnarToRow [codegen id : 11]
-Input [6]: [i_item_sk#22, i_brand_id#23, i_brand#24, i_manufact_id#25, 
i_manufact#26, i_manager_id#27]
+(30) ColumnarToRow [codegen id : 5]
+Input [6]: [i_item_sk#20, i_brand_id#21, i_brand#22, i_manufact_id#23, 
i_manufact#24, i_manager_id#25]
 
-(37) Filter [codegen id : 11]
-Input [6]: [i_item_sk#22, i_brand_id#23, i_brand#24, i_manufact_id#25, 
i_manufact#26, i_manager_id#27]
-Condition : ((isnotnull(i_manager_id#27) AND (i_manager_id#27 = 7)) AND 
isnotnull(i_item_sk#22))
+(31) Filter [codegen id : 5]
+Input [6]: [i_item_sk#20, i_brand_id#21, i_brand#22, i_manufact_id#23, 
i_manufact#24, i_manager_id#25]
+Condition : ((isnotnull(i_manager_id#25) AND (i_manager_id#25 = 7)) AND 
isnotnull(i_item_sk#20))
 
-(38) Project [codegen id : 11]
-Output [5]: [i_item_sk#22, i_brand_id#23, i_brand#24, i_manufact_id#25, 
i_manufact#26]
-Input [6]: [i_item_sk#22, i_brand_id#23, i_brand#24, i_manufact_id#25, 
i_manufact#26, i_manager_id#27]
+(32) Project [codegen id : 5]
+Output [5]: [i_item_sk#20, i_brand_id#21, i_brand#22, i_manufact_id#23, 
i_manufact#24]
+Input [6]: [i_item_sk#20, i_brand_id#21, i_brand#22, i_manufact_id#23, 
i_manufact#24, i_manager_id#25]
 
-(39) BroadcastExchange
-Input [5]: [i_item_sk#22, i_brand_id#23, i_brand#24, i_manufact_id#25, 
i_manufact#26]
-Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as 
bigint)),false), [id=#28]
+(33) BroadcastExchange
+Input [5]: [i_item_sk#20, i_brand_id#21, i_brand#22, i_manufact_id#23, 
i_manufact#24]
+Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as 
bigint)),false), [id=#26]
 
-(40) BroadcastHashJoin [codegen id : 12]
+(34) BroadcastHashJoin [codegen id : 6]
 Left keys [1]: [ss_item_sk#5]
-Right keys [1]: [i_item_sk#22]
+Right keys [1]: [i_item_sk#20]
 Join condition: None
 
-(41) Project [codegen id : 12]
-Output [5]: [ss_ext_sales_price#8, i_brand_id#23, i_brand#24, 
i_manufact_id#25, i_manufact#26]
-Input [7]: [ss_item_sk#5, ss_ext_sales_price#8, i_item_sk#22, i_brand_id#23, 
i_brand#24, i_manufact_id#25, i_manufact#26]
+(35) Project [codegen id : 6]
+Output [5]: [ss_ext_sales_price#8, i_brand_id#21, i_brand#22, 
i_manufact_id#23, i_manufact#24]
+Input [7]: [ss_item_sk#5, ss_ext_sales_price#8, i_item_sk#20, i_brand_id#21, 
i_brand#22, i_manufact_id#23, i_manufact#24]
 
-(42) HashAggregate [codegen id : 12]
-Input [5]: [ss_ext_sales_price#8, i_brand_id#23, i_brand#24, i_manufact_id#25, 
i_manufact#26]
-Keys [4]: [i_brand#24, i_brand_id#23, i_manufact_id#25, i_manufact#26]
+(36) HashAggregate [codegen id : 6]
+Input [5]: [ss_ext_sales_price#8, i_brand_id#21, i_brand#22, i_manufact_id#23, 
i_manufact#24]
+Keys [4]: [i_brand#22, i_brand_id#21, i_manufact_id#23, i_manufact#24]
 Functions [1]: [partial_sum(UnscaledValue(ss_ext_sales_price#8))]
-Aggregate Attributes [1]: [sum#29]
-Results [5]: [i_brand#24, i_brand_id#23, i_manufact_id#25, i_manufact#26, 
sum#30]
+Aggregate Attributes [1]: [sum#27]
+Results [5]: [i_brand#22, i_brand_id#21, i_manufact_id#23, i_manufact#24, 
sum#28]
 
-(43) Exchange
-Input [5]: [i_brand#24, i_brand_id#23, i_manufact_id#25, i_manufact#26, sum#30]
-Arguments: hashpartitioning(i_brand#24, i_brand_id#23, i_manufact_id#25, 
i_manufact#26, 5), ENSURE_REQUIREMENTS, [id=#31]
+(37) Exchange
+Input [5]: [i_brand#22, i_brand_id#21, i_manufact_id#23, i_manufact#24, sum#28]
+Arguments: hashpartitioning(i_brand#22, i_brand_id#21, i_manufact_id#23, 
i_manufact#24, 5), ENSURE_REQUIREMENTS, [id=#29]
 
-(44) HashAggregate [codegen id : 13]
-Input [5]: [i_brand#24, i_brand_id#23, i_manufact_id#25, i_manufact#26, sum#30]
-Keys [4]: [i_brand#24, i_brand_id#23, i_manufact_id#25, i_manufact#26]
+(38) HashAggregate [codegen id : 7]
+Input [5]: [i_brand#22, i_brand_id#21, i_manufact_id#23, i_manufact#24, sum#28]
+Keys [4]: [i_brand#22, i_brand_id#21, i_manufact_id#23, i_manufact#24]
 Functions [1]: [sum(UnscaledValue(ss_ext_sales_price#8))]
-Aggregate Attributes [1]: [sum(UnscaledValue(ss_ext_sales_price#8))#32]
-Results [5]: [i_brand_id#23 AS brand_id#33, i_brand#24 AS brand#34, 
i_manufact_id#25, i_manufact#26, 
MakeDecimal(sum(UnscaledValue(ss_ext_sales_price#8))#32,17,2) AS ext_price#35]
+Aggregate Attributes [1]: [sum(UnscaledValue(ss_ext_sales_price#8))#30]
+Results [5]: [i_brand_id#21 AS brand_id#31, i_brand#22 AS brand#32, 
i_manufact_id#23, i_manufact#24, 
MakeDecimal(sum(UnscaledValue(ss_ext_sales_price#8))#30,17,2) AS ext_price#33]
 
-(45) TakeOrderedAndProject
-Input [5]: [brand_id#33, brand#34, i_manufact_id#25, i_manufact#26, 
ext_price#35]
-Arguments: 100, [ext_price#35 DESC NULLS LAST, brand#34 ASC NULLS FIRST, 
brand_id#33 ASC NULLS FIRST, i_manufact_id#25 ASC NULLS FIRST, i_manufact#26 
ASC NULLS FIRST], [brand_id#33, brand#34, i_manufact_id#25, i_manufact#26, 
ext_price#35]
+(39) TakeOrderedAndProject
+Input [5]: [brand_id#31, brand#32, i_manufact_id#23, i_manufact#24, 
ext_price#33]
+Arguments: 100, [ext_price#33 DESC NULLS LAST, brand#32 ASC NULLS FIRST, 
brand_id#31 ASC NULLS FIRST, i_manufact_id#23 ASC NULLS FIRST, i_manufact#24 
ASC NULLS FIRST], [brand_id#31, brand#32, i_manufact_id#23, i_manufact#24, 
ext_price#33]
 
 ===== Subqueries =====
 
 Subquery:1 Hosting operator id = 6 Hosting Expression = ss_sold_date_sk#9 IN 
dynamicpruning#10
-ReusedExchange (46)
+ReusedExchange (40)
 
 
-(46) ReusedExchange [Reuses operator id: 5]
+(40) ReusedExchange [Reuses operator id: 5]
 Output [1]: [d_date_sk#1]
 
 
diff --git 
a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-modified/q19.sf100/simplified.txt
 
b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-modified/q19.sf100/simplified.txt
index 36933c4..6b5868a 100644
--- 
a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-modified/q19.sf100/simplified.txt
+++ 
b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-modified/q19.sf100/simplified.txt
@@ -1,76 +1,58 @@
 TakeOrderedAndProject [ext_price,brand,brand_id,i_manufact_id,i_manufact]
-  WholeStageCodegen (13)
+  WholeStageCodegen (7)
     HashAggregate [i_brand,i_brand_id,i_manufact_id,i_manufact,sum] 
[sum(UnscaledValue(ss_ext_sales_price)),brand_id,brand,ext_price,sum]
       InputAdapter
         Exchange [i_brand,i_brand_id,i_manufact_id,i_manufact] #1
-          WholeStageCodegen (12)
+          WholeStageCodegen (6)
             HashAggregate 
[i_brand,i_brand_id,i_manufact_id,i_manufact,ss_ext_sales_price] [sum,sum]
               Project 
[ss_ext_sales_price,i_brand_id,i_brand,i_manufact_id,i_manufact]
                 BroadcastHashJoin [ss_item_sk,i_item_sk]
                   Project [ss_item_sk,ss_ext_sales_price]
-                    SortMergeJoin 
[c_current_addr_sk,ca_address_sk,ca_zip,s_zip]
+                    BroadcastHashJoin 
[c_current_addr_sk,ca_address_sk,ca_zip,s_zip]
                       InputAdapter
-                        WholeStageCodegen (8)
-                          Sort [c_current_addr_sk]
-                            InputAdapter
-                              Exchange [c_current_addr_sk] #2
-                                WholeStageCodegen (7)
-                                  Project 
[ss_item_sk,ss_ext_sales_price,c_current_addr_sk,s_zip]
-                                    BroadcastHashJoin [ss_store_sk,s_store_sk]
-                                      Project 
[ss_item_sk,ss_store_sk,ss_ext_sales_price,c_current_addr_sk]
-                                        SortMergeJoin 
[ss_customer_sk,c_customer_sk]
-                                          InputAdapter
-                                            WholeStageCodegen (3)
-                                              Sort [ss_customer_sk]
-                                                InputAdapter
-                                                  Exchange [ss_customer_sk] #3
-                                                    WholeStageCodegen (2)
-                                                      Project 
[ss_item_sk,ss_customer_sk,ss_store_sk,ss_ext_sales_price]
-                                                        BroadcastHashJoin 
[d_date_sk,ss_sold_date_sk]
-                                                          InputAdapter
-                                                            BroadcastExchange 
#4
-                                                              
WholeStageCodegen (1)
-                                                                Project 
[d_date_sk]
-                                                                  Filter 
[d_moy,d_year,d_date_sk]
-                                                                    
ColumnarToRow
-                                                                      
InputAdapter
-                                                                        Scan 
parquet default.date_dim [d_date_sk,d_year,d_moy]
-                                                          Filter 
[ss_item_sk,ss_customer_sk,ss_store_sk]
-                                                            ColumnarToRow
-                                                              InputAdapter
-                                                                Scan parquet 
default.store_sales 
[ss_item_sk,ss_customer_sk,ss_store_sk,ss_ext_sales_price,ss_sold_date_sk]
-                                                                  
SubqueryBroadcast [d_date_sk] #1
-                                                                    
ReusedExchange [d_date_sk] #4
-                                          InputAdapter
-                                            WholeStageCodegen (5)
-                                              Sort [c_customer_sk]
-                                                InputAdapter
-                                                  Exchange [c_customer_sk] #5
-                                                    WholeStageCodegen (4)
-                                                      Filter 
[c_customer_sk,c_current_addr_sk]
+                        BroadcastExchange #2
+                          WholeStageCodegen (4)
+                            Project 
[ss_item_sk,ss_ext_sales_price,c_current_addr_sk,s_zip]
+                              BroadcastHashJoin [ss_store_sk,s_store_sk]
+                                Project 
[ss_item_sk,ss_store_sk,ss_ext_sales_price,c_current_addr_sk]
+                                  BroadcastHashJoin 
[ss_customer_sk,c_customer_sk]
+                                    InputAdapter
+                                      BroadcastExchange #3
+                                        WholeStageCodegen (2)
+                                          Project 
[ss_item_sk,ss_customer_sk,ss_store_sk,ss_ext_sales_price]
+                                            BroadcastHashJoin 
[d_date_sk,ss_sold_date_sk]
+                                              InputAdapter
+                                                BroadcastExchange #4
+                                                  WholeStageCodegen (1)
+                                                    Project [d_date_sk]
+                                                      Filter 
[d_moy,d_year,d_date_sk]
                                                         ColumnarToRow
                                                           InputAdapter
-                                                            Scan parquet 
default.customer [c_customer_sk,c_current_addr_sk]
-                                      InputAdapter
-                                        BroadcastExchange #6
-                                          WholeStageCodegen (6)
-                                            Filter [s_zip,s_store_sk]
-                                              ColumnarToRow
-                                                InputAdapter
-                                                  Scan parquet default.store 
[s_store_sk,s_zip]
-                      InputAdapter
-                        WholeStageCodegen (10)
-                          Sort [ca_address_sk]
-                            InputAdapter
-                              Exchange [ca_address_sk] #7
-                                WholeStageCodegen (9)
-                                  Filter [ca_address_sk,ca_zip]
-                                    ColumnarToRow
-                                      InputAdapter
-                                        Scan parquet default.customer_address 
[ca_address_sk,ca_zip]
+                                                            Scan parquet 
default.date_dim [d_date_sk,d_year,d_moy]
+                                              Filter 
[ss_item_sk,ss_customer_sk,ss_store_sk]
+                                                ColumnarToRow
+                                                  InputAdapter
+                                                    Scan parquet 
default.store_sales 
[ss_item_sk,ss_customer_sk,ss_store_sk,ss_ext_sales_price,ss_sold_date_sk]
+                                                      SubqueryBroadcast 
[d_date_sk] #1
+                                                        ReusedExchange 
[d_date_sk] #4
+                                    Filter [c_customer_sk,c_current_addr_sk]
+                                      ColumnarToRow
+                                        InputAdapter
+                                          Scan parquet default.customer 
[c_customer_sk,c_current_addr_sk]
+                                InputAdapter
+                                  BroadcastExchange #5
+                                    WholeStageCodegen (3)
+                                      Filter [s_zip,s_store_sk]
+                                        ColumnarToRow
+                                          InputAdapter
+                                            Scan parquet default.store 
[s_store_sk,s_zip]
+                      Filter [ca_address_sk,ca_zip]
+                        ColumnarToRow
+                          InputAdapter
+                            Scan parquet default.customer_address 
[ca_address_sk,ca_zip]
                   InputAdapter
-                    BroadcastExchange #8
-                      WholeStageCodegen (11)
+                    BroadcastExchange #6
+                      WholeStageCodegen (5)
                         Project 
[i_item_sk,i_brand_id,i_brand,i_manufact_id,i_manufact]
                           Filter [i_manager_id,i_item_sk]
                             ColumnarToRow
diff --git 
a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-modified/q68.sf100/explain.txt
 
b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-modified/q68.sf100/explain.txt
index 98630d2..020f734 100644
--- 
a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-modified/q68.sf100/explain.txt
+++ 
b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-modified/q68.sf100/explain.txt
@@ -1,46 +1,49 @@
 == Physical Plan ==
-TakeOrderedAndProject (42)
-+- * Project (41)
-   +- * BroadcastHashJoin Inner BuildLeft (40)
-      :- BroadcastExchange (36)
-      :  +- * Project (35)
-      :     +- * BroadcastHashJoin Inner BuildLeft (34)
-      :        :- BroadcastExchange (30)
-      :        :  +- * HashAggregate (29)
-      :        :     +- Exchange (28)
-      :        :        +- * HashAggregate (27)
-      :        :           +- * Project (26)
-      :        :              +- * BroadcastHashJoin Inner BuildLeft (25)
-      :        :                 :- BroadcastExchange (21)
-      :        :                 :  +- * Project (20)
-      :        :                 :     +- * BroadcastHashJoin Inner BuildRight 
(19)
-      :        :                 :        :- * Project (13)
-      :        :                 :        :  +- * BroadcastHashJoin Inner 
BuildRight (12)
-      :        :                 :        :     :- * Project (10)
-      :        :                 :        :     :  +- * BroadcastHashJoin 
Inner BuildRight (9)
-      :        :                 :        :     :     :- * Filter (3)
-      :        :                 :        :     :     :  +- * ColumnarToRow (2)
-      :        :                 :        :     :     :     +- Scan parquet 
default.store_sales (1)
-      :        :                 :        :     :     +- BroadcastExchange (8)
-      :        :                 :        :     :        +- * Project (7)
-      :        :                 :        :     :           +- * Filter (6)
-      :        :                 :        :     :              +- * 
ColumnarToRow (5)
-      :        :                 :        :     :                 +- Scan 
parquet default.store (4)
-      :        :                 :        :     +- ReusedExchange (11)
-      :        :                 :        +- BroadcastExchange (18)
-      :        :                 :           +- * Project (17)
-      :        :                 :              +- * Filter (16)
-      :        :                 :                 +- * ColumnarToRow (15)
-      :        :                 :                    +- Scan parquet 
default.household_demographics (14)
-      :        :                 +- * Filter (24)
-      :        :                    +- * ColumnarToRow (23)
-      :        :                       +- Scan parquet 
default.customer_address (22)
-      :        +- * Filter (33)
-      :           +- * ColumnarToRow (32)
-      :              +- Scan parquet default.customer (31)
-      +- * Filter (39)
-         +- * ColumnarToRow (38)
-            +- Scan parquet default.customer_address (37)
+TakeOrderedAndProject (45)
++- * Project (44)
+   +- * SortMergeJoin Inner (43)
+      :- * Sort (37)
+      :  +- Exchange (36)
+      :     +- * Project (35)
+      :        +- * BroadcastHashJoin Inner BuildLeft (34)
+      :           :- BroadcastExchange (30)
+      :           :  +- * HashAggregate (29)
+      :           :     +- Exchange (28)
+      :           :        +- * HashAggregate (27)
+      :           :           +- * Project (26)
+      :           :              +- * BroadcastHashJoin Inner BuildLeft (25)
+      :           :                 :- BroadcastExchange (21)
+      :           :                 :  +- * Project (20)
+      :           :                 :     +- * BroadcastHashJoin Inner 
BuildRight (19)
+      :           :                 :        :- * Project (13)
+      :           :                 :        :  +- * BroadcastHashJoin Inner 
BuildRight (12)
+      :           :                 :        :     :- * Project (6)
+      :           :                 :        :     :  +- * BroadcastHashJoin 
Inner BuildRight (5)
+      :           :                 :        :     :     :- * Filter (3)
+      :           :                 :        :     :     :  +- * ColumnarToRow 
(2)
+      :           :                 :        :     :     :     +- Scan parquet 
default.store_sales (1)
+      :           :                 :        :     :     +- ReusedExchange (4)
+      :           :                 :        :     +- BroadcastExchange (11)
+      :           :                 :        :        +- * Project (10)
+      :           :                 :        :           +- * Filter (9)
+      :           :                 :        :              +- * ColumnarToRow 
(8)
+      :           :                 :        :                 +- Scan parquet 
default.store (7)
+      :           :                 :        +- BroadcastExchange (18)
+      :           :                 :           +- * Project (17)
+      :           :                 :              +- * Filter (16)
+      :           :                 :                 +- * ColumnarToRow (15)
+      :           :                 :                    +- Scan parquet 
default.household_demographics (14)
+      :           :                 +- * Filter (24)
+      :           :                    +- * ColumnarToRow (23)
+      :           :                       +- Scan parquet 
default.customer_address (22)
+      :           +- * Filter (33)
+      :              +- * ColumnarToRow (32)
+      :                 +- Scan parquet default.customer (31)
+      +- * Sort (42)
+         +- Exchange (41)
+            +- * Filter (40)
+               +- * ColumnarToRow (39)
+                  +- Scan parquet default.customer_address (38)
 
 
 (1) Scan parquet default.store_sales
@@ -58,48 +61,48 @@ Input [9]: [ss_customer_sk#1, ss_hdemo_sk#2, ss_addr_sk#3, 
ss_store_sk#4, ss_tic
 Input [9]: [ss_customer_sk#1, ss_hdemo_sk#2, ss_addr_sk#3, ss_store_sk#4, 
ss_ticket_number#5, ss_ext_sales_price#6, ss_ext_list_price#7, ss_ext_tax#8, 
ss_sold_date_sk#9]
 Condition : (((isnotnull(ss_store_sk#4) AND isnotnull(ss_hdemo_sk#2)) AND 
isnotnull(ss_addr_sk#3)) AND isnotnull(ss_customer_sk#1))
 
-(4) Scan parquet default.store
-Output [2]: [s_store_sk#11, s_city#12]
+(4) ReusedExchange [Reuses operator id: 50]
+Output [1]: [d_date_sk#11]
+
+(5) BroadcastHashJoin [codegen id : 4]
+Left keys [1]: [ss_sold_date_sk#9]
+Right keys [1]: [d_date_sk#11]
+Join condition: None
+
+(6) Project [codegen id : 4]
+Output [8]: [ss_customer_sk#1, ss_hdemo_sk#2, ss_addr_sk#3, ss_store_sk#4, 
ss_ticket_number#5, ss_ext_sales_price#6, ss_ext_list_price#7, ss_ext_tax#8]
+Input [10]: [ss_customer_sk#1, ss_hdemo_sk#2, ss_addr_sk#3, ss_store_sk#4, 
ss_ticket_number#5, ss_ext_sales_price#6, ss_ext_list_price#7, ss_ext_tax#8, 
ss_sold_date_sk#9, d_date_sk#11]
+
+(7) Scan parquet default.store
+Output [2]: [s_store_sk#12, s_city#13]
 Batched: true
 Location [not included in comparison]/{warehouse_dir}/store]
 PushedFilters: [In(s_city, [Fairview,Midway]), IsNotNull(s_store_sk)]
 ReadSchema: struct<s_store_sk:int,s_city:string>
 
-(5) ColumnarToRow [codegen id : 1]
-Input [2]: [s_store_sk#11, s_city#12]
+(8) ColumnarToRow [codegen id : 2]
+Input [2]: [s_store_sk#12, s_city#13]
 
-(6) Filter [codegen id : 1]
-Input [2]: [s_store_sk#11, s_city#12]
-Condition : (s_city#12 IN (Midway,Fairview) AND isnotnull(s_store_sk#11))
-
-(7) Project [codegen id : 1]
-Output [1]: [s_store_sk#11]
-Input [2]: [s_store_sk#11, s_city#12]
-
-(8) BroadcastExchange
-Input [1]: [s_store_sk#11]
-Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as 
bigint)),false), [id=#13]
-
-(9) BroadcastHashJoin [codegen id : 4]
-Left keys [1]: [ss_store_sk#4]
-Right keys [1]: [s_store_sk#11]
-Join condition: None
+(9) Filter [codegen id : 2]
+Input [2]: [s_store_sk#12, s_city#13]
+Condition : (s_city#13 IN (Midway,Fairview) AND isnotnull(s_store_sk#12))
 
-(10) Project [codegen id : 4]
-Output [8]: [ss_customer_sk#1, ss_hdemo_sk#2, ss_addr_sk#3, 
ss_ticket_number#5, ss_ext_sales_price#6, ss_ext_list_price#7, ss_ext_tax#8, 
ss_sold_date_sk#9]
-Input [10]: [ss_customer_sk#1, ss_hdemo_sk#2, ss_addr_sk#3, ss_store_sk#4, 
ss_ticket_number#5, ss_ext_sales_price#6, ss_ext_list_price#7, ss_ext_tax#8, 
ss_sold_date_sk#9, s_store_sk#11]
+(10) Project [codegen id : 2]
+Output [1]: [s_store_sk#12]
+Input [2]: [s_store_sk#12, s_city#13]
 
-(11) ReusedExchange [Reuses operator id: 47]
-Output [1]: [d_date_sk#14]
+(11) BroadcastExchange
+Input [1]: [s_store_sk#12]
+Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as 
bigint)),false), [id=#14]
 
 (12) BroadcastHashJoin [codegen id : 4]
-Left keys [1]: [ss_sold_date_sk#9]
-Right keys [1]: [d_date_sk#14]
+Left keys [1]: [ss_store_sk#4]
+Right keys [1]: [s_store_sk#12]
 Join condition: None
 
 (13) Project [codegen id : 4]
 Output [7]: [ss_customer_sk#1, ss_hdemo_sk#2, ss_addr_sk#3, 
ss_ticket_number#5, ss_ext_sales_price#6, ss_ext_list_price#7, ss_ext_tax#8]
-Input [9]: [ss_customer_sk#1, ss_hdemo_sk#2, ss_addr_sk#3, ss_ticket_number#5, 
ss_ext_sales_price#6, ss_ext_list_price#7, ss_ext_tax#8, ss_sold_date_sk#9, 
d_date_sk#14]
+Input [9]: [ss_customer_sk#1, ss_hdemo_sk#2, ss_addr_sk#3, ss_store_sk#4, 
ss_ticket_number#5, ss_ext_sales_price#6, ss_ext_list_price#7, ss_ext_tax#8, 
s_store_sk#12]
 
 (14) Scan parquet default.household_demographics
 Output [3]: [hd_demo_sk#15, hd_dep_count#16, hd_vehicle_count#17]
@@ -204,67 +207,79 @@ Join condition: None
 Output [8]: [ss_ticket_number#5, bought_city#32, extended_price#33, 
list_price#34, extended_tax#35, c_current_addr_sk#38, c_first_name#39, 
c_last_name#40]
 Input [10]: [ss_ticket_number#5, ss_customer_sk#1, bought_city#32, 
extended_price#33, list_price#34, extended_tax#35, c_customer_sk#37, 
c_current_addr_sk#38, c_first_name#39, c_last_name#40]
 
-(36) BroadcastExchange
+(36) Exchange
+Input [8]: [ss_ticket_number#5, bought_city#32, extended_price#33, 
list_price#34, extended_tax#35, c_current_addr_sk#38, c_first_name#39, 
c_last_name#40]
+Arguments: hashpartitioning(c_current_addr_sk#38, 5), ENSURE_REQUIREMENTS, 
[id=#41]
+
+(37) Sort [codegen id : 8]
 Input [8]: [ss_ticket_number#5, bought_city#32, extended_price#33, 
list_price#34, extended_tax#35, c_current_addr_sk#38, c_first_name#39, 
c_last_name#40]
-Arguments: HashedRelationBroadcastMode(List(cast(input[5, int, true] as 
bigint)),false), [id=#41]
+Arguments: [c_current_addr_sk#38 ASC NULLS FIRST], false, 0
 
-(37) Scan parquet default.customer_address
+(38) Scan parquet default.customer_address
 Output [2]: [ca_address_sk#42, ca_city#43]
 Batched: true
 Location [not included in comparison]/{warehouse_dir}/customer_address]
 PushedFilters: [IsNotNull(ca_address_sk), IsNotNull(ca_city)]
 ReadSchema: struct<ca_address_sk:int,ca_city:string>
 
-(38) ColumnarToRow
+(39) ColumnarToRow [codegen id : 9]
 Input [2]: [ca_address_sk#42, ca_city#43]
 
-(39) Filter
+(40) Filter [codegen id : 9]
 Input [2]: [ca_address_sk#42, ca_city#43]
 Condition : (isnotnull(ca_address_sk#42) AND isnotnull(ca_city#43))
 
-(40) BroadcastHashJoin [codegen id : 8]
+(41) Exchange
+Input [2]: [ca_address_sk#42, ca_city#43]
+Arguments: hashpartitioning(ca_address_sk#42, 5), ENSURE_REQUIREMENTS, [id=#44]
+
+(42) Sort [codegen id : 10]
+Input [2]: [ca_address_sk#42, ca_city#43]
+Arguments: [ca_address_sk#42 ASC NULLS FIRST], false, 0
+
+(43) SortMergeJoin [codegen id : 11]
 Left keys [1]: [c_current_addr_sk#38]
 Right keys [1]: [ca_address_sk#42]
 Join condition: NOT (ca_city#43 = bought_city#32)
 
-(41) Project [codegen id : 8]
+(44) Project [codegen id : 11]
 Output [8]: [c_last_name#40, c_first_name#39, ca_city#43, bought_city#32, 
ss_ticket_number#5, extended_price#33, extended_tax#35, list_price#34]
 Input [10]: [ss_ticket_number#5, bought_city#32, extended_price#33, 
list_price#34, extended_tax#35, c_current_addr_sk#38, c_first_name#39, 
c_last_name#40, ca_address_sk#42, ca_city#43]
 
-(42) TakeOrderedAndProject
+(45) TakeOrderedAndProject
 Input [8]: [c_last_name#40, c_first_name#39, ca_city#43, bought_city#32, 
ss_ticket_number#5, extended_price#33, extended_tax#35, list_price#34]
 Arguments: 100, [c_last_name#40 ASC NULLS FIRST, ss_ticket_number#5 ASC NULLS 
FIRST], [c_last_name#40, c_first_name#39, ca_city#43, bought_city#32, 
ss_ticket_number#5, extended_price#33, extended_tax#35, list_price#34]
 
 ===== Subqueries =====
 
 Subquery:1 Hosting operator id = 1 Hosting Expression = ss_sold_date_sk#9 IN 
dynamicpruning#10
-BroadcastExchange (47)
-+- * Project (46)
-   +- * Filter (45)
-      +- * ColumnarToRow (44)
-         +- Scan parquet default.date_dim (43)
+BroadcastExchange (50)
++- * Project (49)
+   +- * Filter (48)
+      +- * ColumnarToRow (47)
+         +- Scan parquet default.date_dim (46)
 
 
-(43) Scan parquet default.date_dim
-Output [3]: [d_date_sk#14, d_year#44, d_dom#45]
+(46) Scan parquet default.date_dim
+Output [3]: [d_date_sk#11, d_year#45, d_dom#46]
 Batched: true
 Location [not included in comparison]/{warehouse_dir}/date_dim]
 PushedFilters: [IsNotNull(d_dom), GreaterThanOrEqual(d_dom,1), 
LessThanOrEqual(d_dom,2), In(d_year, [1999,2000,2001]), In(d_date_sk, 
[2451180,2451181,2451211,2451212,2451239,2451240,2451270,2451271,2451300,2451301,2451331,2451332,2451361,2451362,2451392,2451393,2451423,2451424,2451453,2451454,2451484,2451485,2451514,2451515,2451545,2451546,2451576,2451577,2451605,2451606,2451636,2451637,2451666,2451667,2451697,2451698,2451727,2451728,2451758,2451759,2451789,2451790,2451819,2451820,245185
 [...]
 ReadSchema: struct<d_date_sk:int,d_year:int,d_dom:int>
 
-(44) ColumnarToRow [codegen id : 1]
-Input [3]: [d_date_sk#14, d_year#44, d_dom#45]
+(47) ColumnarToRow [codegen id : 1]
+Input [3]: [d_date_sk#11, d_year#45, d_dom#46]
 
-(45) Filter [codegen id : 1]
-Input [3]: [d_date_sk#14, d_year#44, d_dom#45]
-Condition : (((((isnotnull(d_dom#45) AND (d_dom#45 >= 1)) AND (d_dom#45 <= 2)) 
AND d_year#44 IN (1999,2000,2001)) AND d_date_sk#14 INSET 2451180, 2451181, 
2451211, 2451212, 2451239, 2451240, 2451270, 2451271, 2451300, 2451301, 
2451331, 2451332, 2451361, 2451362, 2451392, 2451393, 2451423, 2451424, 
2451453, 2451454, 2451484, 2451485, 2451514, 2451515, 2451545, 2451546, 
2451576, 2451577, 2451605, 2451606, 2451636, 2451637, 2451666, 2451667, 
2451697, 2451698, 2451727, 2451728, 2451758, 2451 [...]
+(48) Filter [codegen id : 1]
+Input [3]: [d_date_sk#11, d_year#45, d_dom#46]
+Condition : (((((isnotnull(d_dom#46) AND (d_dom#46 >= 1)) AND (d_dom#46 <= 2)) 
AND d_year#45 IN (1999,2000,2001)) AND d_date_sk#11 INSET 2451180, 2451181, 
2451211, 2451212, 2451239, 2451240, 2451270, 2451271, 2451300, 2451301, 
2451331, 2451332, 2451361, 2451362, 2451392, 2451393, 2451423, 2451424, 
2451453, 2451454, 2451484, 2451485, 2451514, 2451515, 2451545, 2451546, 
2451576, 2451577, 2451605, 2451606, 2451636, 2451637, 2451666, 2451667, 
2451697, 2451698, 2451727, 2451728, 2451758, 2451 [...]
 
-(46) Project [codegen id : 1]
-Output [1]: [d_date_sk#14]
-Input [3]: [d_date_sk#14, d_year#44, d_dom#45]
+(49) Project [codegen id : 1]
+Output [1]: [d_date_sk#11]
+Input [3]: [d_date_sk#11, d_year#45, d_dom#46]
 
-(47) BroadcastExchange
-Input [1]: [d_date_sk#14]
-Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as 
bigint)),false), [id=#46]
+(50) BroadcastExchange
+Input [1]: [d_date_sk#11]
+Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as 
bigint)),false), [id=#47]
 
 
diff --git 
a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-modified/q68.sf100/simplified.txt
 
b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-modified/q68.sf100/simplified.txt
index 2c8c8cc..7a7c90a 100644
--- 
a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-modified/q68.sf100/simplified.txt
+++ 
b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-modified/q68.sf100/simplified.txt
@@ -1,70 +1,79 @@
 TakeOrderedAndProject 
[c_last_name,ss_ticket_number,c_first_name,ca_city,bought_city,extended_price,extended_tax,list_price]
-  WholeStageCodegen (8)
+  WholeStageCodegen (11)
     Project 
[c_last_name,c_first_name,ca_city,bought_city,ss_ticket_number,extended_price,extended_tax,list_price]
-      BroadcastHashJoin [c_current_addr_sk,ca_address_sk,ca_city,bought_city]
+      SortMergeJoin [c_current_addr_sk,ca_address_sk,ca_city,bought_city]
         InputAdapter
-          BroadcastExchange #1
-            WholeStageCodegen (7)
-              Project 
[ss_ticket_number,bought_city,extended_price,list_price,extended_tax,c_current_addr_sk,c_first_name,c_last_name]
-                BroadcastHashJoin [ss_customer_sk,c_customer_sk]
-                  InputAdapter
-                    BroadcastExchange #2
-                      WholeStageCodegen (6)
-                        HashAggregate 
[ss_ticket_number,ss_customer_sk,ss_addr_sk,ca_city,sum,sum,sum] 
[sum(UnscaledValue(ss_ext_sales_price)),sum(UnscaledValue(ss_ext_list_price)),sum(UnscaledValue(ss_ext_tax)),bought_city,extended_price,list_price,extended_tax,sum,sum,sum]
-                          InputAdapter
-                            Exchange 
[ss_ticket_number,ss_customer_sk,ss_addr_sk,ca_city] #3
-                              WholeStageCodegen (5)
-                                HashAggregate 
[ss_ticket_number,ss_customer_sk,ss_addr_sk,ca_city,ss_ext_sales_price,ss_ext_list_price,ss_ext_tax]
 [sum,sum,sum,sum,sum,sum]
-                                  Project 
[ss_customer_sk,ss_addr_sk,ss_ticket_number,ss_ext_sales_price,ss_ext_list_price,ss_ext_tax,ca_city]
-                                    BroadcastHashJoin 
[ss_addr_sk,ca_address_sk]
-                                      InputAdapter
-                                        BroadcastExchange #4
-                                          WholeStageCodegen (4)
-                                            Project 
[ss_customer_sk,ss_addr_sk,ss_ticket_number,ss_ext_sales_price,ss_ext_list_price,ss_ext_tax]
-                                              BroadcastHashJoin 
[ss_hdemo_sk,hd_demo_sk]
-                                                Project 
[ss_customer_sk,ss_hdemo_sk,ss_addr_sk,ss_ticket_number,ss_ext_sales_price,ss_ext_list_price,ss_ext_tax]
-                                                  BroadcastHashJoin 
[ss_sold_date_sk,d_date_sk]
-                                                    Project 
[ss_customer_sk,ss_hdemo_sk,ss_addr_sk,ss_ticket_number,ss_ext_sales_price,ss_ext_list_price,ss_ext_tax,ss_sold_date_sk]
-                                                      BroadcastHashJoin 
[ss_store_sk,s_store_sk]
-                                                        Filter 
[ss_store_sk,ss_hdemo_sk,ss_addr_sk,ss_customer_sk]
-                                                          ColumnarToRow
-                                                            InputAdapter
-                                                              Scan parquet 
default.store_sales 
[ss_customer_sk,ss_hdemo_sk,ss_addr_sk,ss_store_sk,ss_ticket_number,ss_ext_sales_price,ss_ext_list_price,ss_ext_tax,ss_sold_date_sk]
-                                                                
SubqueryBroadcast [d_date_sk] #1
-                                                                  
BroadcastExchange #5
-                                                                    
WholeStageCodegen (1)
-                                                                      Project 
[d_date_sk]
-                                                                        Filter 
[d_dom,d_year,d_date_sk]
-                                                                          
ColumnarToRow
-                                                                            
InputAdapter
-                                                                              
Scan parquet default.date_dim [d_date_sk,d_year,d_dom]
-                                                        InputAdapter
-                                                          BroadcastExchange #6
-                                                            WholeStageCodegen 
(1)
-                                                              Project 
[s_store_sk]
-                                                                Filter 
[s_city,s_store_sk]
-                                                                  ColumnarToRow
-                                                                    
InputAdapter
-                                                                      Scan 
parquet default.store [s_store_sk,s_city]
-                                                    InputAdapter
-                                                      ReusedExchange 
[d_date_sk] #5
+          WholeStageCodegen (8)
+            Sort [c_current_addr_sk]
+              InputAdapter
+                Exchange [c_current_addr_sk] #1
+                  WholeStageCodegen (7)
+                    Project 
[ss_ticket_number,bought_city,extended_price,list_price,extended_tax,c_current_addr_sk,c_first_name,c_last_name]
+                      BroadcastHashJoin [ss_customer_sk,c_customer_sk]
+                        InputAdapter
+                          BroadcastExchange #2
+                            WholeStageCodegen (6)
+                              HashAggregate 
[ss_ticket_number,ss_customer_sk,ss_addr_sk,ca_city,sum,sum,sum] 
[sum(UnscaledValue(ss_ext_sales_price)),sum(UnscaledValue(ss_ext_list_price)),sum(UnscaledValue(ss_ext_tax)),bought_city,extended_price,list_price,extended_tax,sum,sum,sum]
+                                InputAdapter
+                                  Exchange 
[ss_ticket_number,ss_customer_sk,ss_addr_sk,ca_city] #3
+                                    WholeStageCodegen (5)
+                                      HashAggregate 
[ss_ticket_number,ss_customer_sk,ss_addr_sk,ca_city,ss_ext_sales_price,ss_ext_list_price,ss_ext_tax]
 [sum,sum,sum,sum,sum,sum]
+                                        Project 
[ss_customer_sk,ss_addr_sk,ss_ticket_number,ss_ext_sales_price,ss_ext_list_price,ss_ext_tax,ca_city]
+                                          BroadcastHashJoin 
[ss_addr_sk,ca_address_sk]
+                                            InputAdapter
+                                              BroadcastExchange #4
+                                                WholeStageCodegen (4)
+                                                  Project 
[ss_customer_sk,ss_addr_sk,ss_ticket_number,ss_ext_sales_price,ss_ext_list_price,ss_ext_tax]
+                                                    BroadcastHashJoin 
[ss_hdemo_sk,hd_demo_sk]
+                                                      Project 
[ss_customer_sk,ss_hdemo_sk,ss_addr_sk,ss_ticket_number,ss_ext_sales_price,ss_ext_list_price,ss_ext_tax]
+                                                        BroadcastHashJoin 
[ss_store_sk,s_store_sk]
+                                                          Project 
[ss_customer_sk,ss_hdemo_sk,ss_addr_sk,ss_store_sk,ss_ticket_number,ss_ext_sales_price,ss_ext_list_price,ss_ext_tax]
+                                                            BroadcastHashJoin 
[ss_sold_date_sk,d_date_sk]
+                                                              Filter 
[ss_store_sk,ss_hdemo_sk,ss_addr_sk,ss_customer_sk]
+                                                                ColumnarToRow
+                                                                  InputAdapter
+                                                                    Scan 
parquet default.store_sales 
[ss_customer_sk,ss_hdemo_sk,ss_addr_sk,ss_store_sk,ss_ticket_number,ss_ext_sales_price,ss_ext_list_price,ss_ext_tax,ss_sold_date_sk]
+                                                                      
SubqueryBroadcast [d_date_sk] #1
+                                                                        
BroadcastExchange #5
+                                                                          
WholeStageCodegen (1)
+                                                                            
Project [d_date_sk]
+                                                                              
Filter [d_dom,d_year,d_date_sk]
+                                                                               
 ColumnarToRow
+                                                                               
   InputAdapter
+                                                                               
     Scan parquet default.date_dim [d_date_sk,d_year,d_dom]
+                                                              InputAdapter
+                                                                ReusedExchange 
[d_date_sk] #5
+                                                          InputAdapter
+                                                            BroadcastExchange 
#6
+                                                              
WholeStageCodegen (2)
+                                                                Project 
[s_store_sk]
+                                                                  Filter 
[s_city,s_store_sk]
+                                                                    
ColumnarToRow
+                                                                      
InputAdapter
+                                                                        Scan 
parquet default.store [s_store_sk,s_city]
+                                                      InputAdapter
+                                                        BroadcastExchange #7
+                                                          WholeStageCodegen (3)
+                                                            Project 
[hd_demo_sk]
+                                                              Filter 
[hd_dep_count,hd_vehicle_count,hd_demo_sk]
+                                                                ColumnarToRow
+                                                                  InputAdapter
+                                                                    Scan 
parquet default.household_demographics 
[hd_demo_sk,hd_dep_count,hd_vehicle_count]
+                                            Filter [ca_address_sk,ca_city]
+                                              ColumnarToRow
                                                 InputAdapter
-                                                  BroadcastExchange #7
-                                                    WholeStageCodegen (3)
-                                                      Project [hd_demo_sk]
-                                                        Filter 
[hd_dep_count,hd_vehicle_count,hd_demo_sk]
-                                                          ColumnarToRow
-                                                            InputAdapter
-                                                              Scan parquet 
default.household_demographics [hd_demo_sk,hd_dep_count,hd_vehicle_count]
-                                      Filter [ca_address_sk,ca_city]
-                                        ColumnarToRow
-                                          InputAdapter
-                                            Scan parquet 
default.customer_address [ca_address_sk,ca_city]
-                  Filter [c_customer_sk,c_current_addr_sk]
-                    ColumnarToRow
-                      InputAdapter
-                        Scan parquet default.customer 
[c_customer_sk,c_current_addr_sk,c_first_name,c_last_name]
-        Filter [ca_address_sk,ca_city]
-          ColumnarToRow
-            InputAdapter
-              Scan parquet default.customer_address [ca_address_sk,ca_city]
+                                                  Scan parquet 
default.customer_address [ca_address_sk,ca_city]
+                        Filter [c_customer_sk,c_current_addr_sk]
+                          ColumnarToRow
+                            InputAdapter
+                              Scan parquet default.customer 
[c_customer_sk,c_current_addr_sk,c_first_name,c_last_name]
+        InputAdapter
+          WholeStageCodegen (10)
+            Sort [ca_address_sk]
+              InputAdapter
+                Exchange [ca_address_sk] #8
+                  WholeStageCodegen (9)
+                    Filter [ca_address_sk,ca_city]
+                      ColumnarToRow
+                        InputAdapter
+                          Scan parquet default.customer_address 
[ca_address_sk,ca_city]
diff --git 
a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-modified/q73.sf100/explain.txt
 
b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-modified/q73.sf100/explain.txt
index 03242ee..1419499 100644
--- 
a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-modified/q73.sf100/explain.txt
+++ 
b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-modified/q73.sf100/explain.txt
@@ -1,204 +1,214 @@
 == Physical Plan ==
-* Sort (36)
-+- Exchange (35)
-   +- * Project (34)
-      +- * BroadcastHashJoin Inner BuildLeft (33)
-         :- BroadcastExchange (29)
-         :  +- * Filter (28)
-         :     +- * HashAggregate (27)
-         :        +- Exchange (26)
-         :           +- * HashAggregate (25)
-         :              +- * Project (24)
-         :                 +- * BroadcastHashJoin Inner BuildLeft (23)
-         :                    :- BroadcastExchange (18)
-         :                    :  +- * Project (17)
-         :                    :     +- * BroadcastHashJoin Inner BuildLeft (16)
-         :                    :        :- BroadcastExchange (11)
-         :                    :        :  +- * Project (10)
-         :                    :        :     +- * BroadcastHashJoin Inner 
BuildLeft (9)
-         :                    :        :        :- BroadcastExchange (4)
-         :                    :        :        :  +- * Filter (3)
-         :                    :        :        :     +- * ColumnarToRow (2)
-         :                    :        :        :        +- Scan parquet 
default.store_sales (1)
-         :                    :        :        +- * Project (8)
-         :                    :        :           +- * Filter (7)
-         :                    :        :              +- * ColumnarToRow (6)
-         :                    :        :                 +- Scan parquet 
default.date_dim (5)
-         :                    :        +- * Project (15)
-         :                    :           +- * Filter (14)
-         :                    :              +- * ColumnarToRow (13)
-         :                    :                 +- Scan parquet default.store 
(12)
-         :                    +- * Project (22)
-         :                       +- * Filter (21)
-         :                          +- * ColumnarToRow (20)
-         :                             +- Scan parquet 
default.household_demographics (19)
-         +- * Filter (32)
-            +- * ColumnarToRow (31)
-               +- Scan parquet default.customer (30)
+* Sort (32)
++- Exchange (31)
+   +- * Project (30)
+      +- * BroadcastHashJoin Inner BuildLeft (29)
+         :- BroadcastExchange (25)
+         :  +- * Filter (24)
+         :     +- * HashAggregate (23)
+         :        +- Exchange (22)
+         :           +- * HashAggregate (21)
+         :              +- * Project (20)
+         :                 +- * BroadcastHashJoin Inner BuildRight (19)
+         :                    :- * Project (13)
+         :                    :  +- * BroadcastHashJoin Inner BuildRight (12)
+         :                    :     :- * Project (6)
+         :                    :     :  +- * BroadcastHashJoin Inner BuildRight 
(5)
+         :                    :     :     :- * Filter (3)
+         :                    :     :     :  +- * ColumnarToRow (2)
+         :                    :     :     :     +- Scan parquet 
default.store_sales (1)
+         :                    :     :     +- ReusedExchange (4)
+         :                    :     +- BroadcastExchange (11)
+         :                    :        +- * Project (10)
+         :                    :           +- * Filter (9)
+         :                    :              +- * ColumnarToRow (8)
+         :                    :                 +- Scan parquet default.store 
(7)
+         :                    +- BroadcastExchange (18)
+         :                       +- * Project (17)
+         :                          +- * Filter (16)
+         :                             +- * ColumnarToRow (15)
+         :                                +- Scan parquet 
default.household_demographics (14)
+         +- * Filter (28)
+            +- * ColumnarToRow (27)
+               +- Scan parquet default.customer (26)
 
 
 (1) Scan parquet default.store_sales
 Output [5]: [ss_customer_sk#1, ss_hdemo_sk#2, ss_store_sk#3, 
ss_ticket_number#4, ss_sold_date_sk#5]
 Batched: true
 Location: InMemoryFileIndex []
-PartitionFilters: [ss_sold_date_sk#5 INSET 2450815, 2450816, 2450846, 2450847, 
2450874, 2450875, 2450905, 2450906, 2450935, 2450936, 2450966, 2450967, 
2450996, 2450997, 2451027, 2451028, 2451058, 2451059, 2451088, 2451089, 
2451119, 2451120, 2451149, 2451150, 2451180, 2451181, 2451211, 2451212, 
2451239, 2451240, 2451270, 2451271, 2451300, 2451301, 2451331, 2451332, 
2451361, 2451362, 2451392, 2451393, 2451423, 2451424, 2451453, 2451454, 
2451484, 2451485, 2451514, 2451515, 2451545, 2451546, [...]
+PartitionFilters: [ss_sold_date_sk#5 INSET 2450815, 2450816, 2450846, 2450847, 
2450874, 2450875, 2450905, 2450906, 2450935, 2450936, 2450966, 2450967, 
2450996, 2450997, 2451027, 2451028, 2451058, 2451059, 2451088, 2451089, 
2451119, 2451120, 2451149, 2451150, 2451180, 2451181, 2451211, 2451212, 
2451239, 2451240, 2451270, 2451271, 2451300, 2451301, 2451331, 2451332, 
2451361, 2451362, 2451392, 2451393, 2451423, 2451424, 2451453, 2451454, 
2451484, 2451485, 2451514, 2451515, 2451545, 2451546, [...]
 PushedFilters: [IsNotNull(ss_store_sk), IsNotNull(ss_hdemo_sk), 
IsNotNull(ss_customer_sk)]
 ReadSchema: 
struct<ss_customer_sk:int,ss_hdemo_sk:int,ss_store_sk:int,ss_ticket_number:int>
 
-(2) ColumnarToRow [codegen id : 1]
+(2) ColumnarToRow [codegen id : 4]
 Input [5]: [ss_customer_sk#1, ss_hdemo_sk#2, ss_store_sk#3, 
ss_ticket_number#4, ss_sold_date_sk#5]
 
-(3) Filter [codegen id : 1]
+(3) Filter [codegen id : 4]
 Input [5]: [ss_customer_sk#1, ss_hdemo_sk#2, ss_store_sk#3, 
ss_ticket_number#4, ss_sold_date_sk#5]
 Condition : ((isnotnull(ss_store_sk#3) AND isnotnull(ss_hdemo_sk#2)) AND 
isnotnull(ss_customer_sk#1))
 
-(4) BroadcastExchange
-Input [5]: [ss_customer_sk#1, ss_hdemo_sk#2, ss_store_sk#3, 
ss_ticket_number#4, ss_sold_date_sk#5]
-Arguments: HashedRelationBroadcastMode(List(cast(input[4, int, true] as 
bigint)),false), [id=#6]
-
-(5) Scan parquet default.date_dim
-Output [3]: [d_date_sk#7, d_year#8, d_dom#9]
-Batched: true
-Location [not included in comparison]/{warehouse_dir}/date_dim]
-PushedFilters: [IsNotNull(d_dom), GreaterThanOrEqual(d_dom,1), 
LessThanOrEqual(d_dom,2), In(d_year, [1998,1999,2000]), In(d_date_sk, 
[2450815,2450816,2450846,2450847,2450874,2450875,2450905,2450906,2450935,2450936,2450966,2450967,2450996,2450997,2451027,2451028,2451058,2451059,2451088,2451089,2451119,2451120,2451149,2451150,2451180,2451181,2451211,2451212,2451239,2451240,2451270,2451271,2451300,2451301,2451331,2451332,2451361,2451362,2451392,2451393,2451423,2451424,2451453,2451454,245148
 [...]
-ReadSchema: struct<d_date_sk:int,d_year:int,d_dom:int>
-
-(6) ColumnarToRow
-Input [3]: [d_date_sk#7, d_year#8, d_dom#9]
-
-(7) Filter
-Input [3]: [d_date_sk#7, d_year#8, d_dom#9]
-Condition : (((((isnotnull(d_dom#9) AND (d_dom#9 >= 1)) AND (d_dom#9 <= 2)) 
AND d_year#8 IN (1998,1999,2000)) AND d_date_sk#7 INSET 2450815, 2450816, 
2450846, 2450847, 2450874, 2450875, 2450905, 2450906, 2450935, 2450936, 
2450966, 2450967, 2450996, 2450997, 2451027, 2451028, 2451058, 2451059, 
2451088, 2451089, 2451119, 2451120, 2451149, 2451150, 2451180, 2451181, 
2451211, 2451212, 2451239, 2451240, 2451270, 2451271, 2451300, 2451301, 
2451331, 2451332, 2451361, 2451362, 2451392, 2451393,  [...]
-
-(8) Project
+(4) ReusedExchange [Reuses operator id: 37]
 Output [1]: [d_date_sk#7]
-Input [3]: [d_date_sk#7, d_year#8, d_dom#9]
 
-(9) BroadcastHashJoin [codegen id : 2]
+(5) BroadcastHashJoin [codegen id : 4]
 Left keys [1]: [ss_sold_date_sk#5]
 Right keys [1]: [d_date_sk#7]
 Join condition: None
 
-(10) Project [codegen id : 2]
+(6) Project [codegen id : 4]
 Output [4]: [ss_customer_sk#1, ss_hdemo_sk#2, ss_store_sk#3, 
ss_ticket_number#4]
 Input [6]: [ss_customer_sk#1, ss_hdemo_sk#2, ss_store_sk#3, 
ss_ticket_number#4, ss_sold_date_sk#5, d_date_sk#7]
 
-(11) BroadcastExchange
-Input [4]: [ss_customer_sk#1, ss_hdemo_sk#2, ss_store_sk#3, ss_ticket_number#4]
-Arguments: HashedRelationBroadcastMode(List(cast(input[2, int, true] as 
bigint)),false), [id=#10]
-
-(12) Scan parquet default.store
-Output [2]: [s_store_sk#11, s_county#12]
+(7) Scan parquet default.store
+Output [2]: [s_store_sk#8, s_county#9]
 Batched: true
 Location [not included in comparison]/{warehouse_dir}/store]
 PushedFilters: [In(s_county, [Barrow County,Bronx County,Fairfield 
County,Ziebach County]), IsNotNull(s_store_sk)]
 ReadSchema: struct<s_store_sk:int,s_county:string>
 
-(13) ColumnarToRow
-Input [2]: [s_store_sk#11, s_county#12]
+(8) ColumnarToRow [codegen id : 2]
+Input [2]: [s_store_sk#8, s_county#9]
 
-(14) Filter
-Input [2]: [s_store_sk#11, s_county#12]
-Condition : (s_county#12 IN (Fairfield County,Ziebach County,Bronx 
County,Barrow County) AND isnotnull(s_store_sk#11))
+(9) Filter [codegen id : 2]
+Input [2]: [s_store_sk#8, s_county#9]
+Condition : (s_county#9 IN (Fairfield County,Ziebach County,Bronx 
County,Barrow County) AND isnotnull(s_store_sk#8))
 
-(15) Project
-Output [1]: [s_store_sk#11]
-Input [2]: [s_store_sk#11, s_county#12]
+(10) Project [codegen id : 2]
+Output [1]: [s_store_sk#8]
+Input [2]: [s_store_sk#8, s_county#9]
 
-(16) BroadcastHashJoin [codegen id : 3]
+(11) BroadcastExchange
+Input [1]: [s_store_sk#8]
+Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as 
bigint)),false), [id=#10]
+
+(12) BroadcastHashJoin [codegen id : 4]
 Left keys [1]: [ss_store_sk#3]
-Right keys [1]: [s_store_sk#11]
+Right keys [1]: [s_store_sk#8]
 Join condition: None
 
-(17) Project [codegen id : 3]
+(13) Project [codegen id : 4]
 Output [3]: [ss_customer_sk#1, ss_hdemo_sk#2, ss_ticket_number#4]
-Input [5]: [ss_customer_sk#1, ss_hdemo_sk#2, ss_store_sk#3, 
ss_ticket_number#4, s_store_sk#11]
-
-(18) BroadcastExchange
-Input [3]: [ss_customer_sk#1, ss_hdemo_sk#2, ss_ticket_number#4]
-Arguments: HashedRelationBroadcastMode(List(cast(input[1, int, true] as 
bigint)),false), [id=#13]
+Input [5]: [ss_customer_sk#1, ss_hdemo_sk#2, ss_store_sk#3, 
ss_ticket_number#4, s_store_sk#8]
 
-(19) Scan parquet default.household_demographics
-Output [4]: [hd_demo_sk#14, hd_buy_potential#15, hd_dep_count#16, 
hd_vehicle_count#17]
+(14) Scan parquet default.household_demographics
+Output [4]: [hd_demo_sk#11, hd_buy_potential#12, hd_dep_count#13, 
hd_vehicle_count#14]
 Batched: true
 Location [not included in comparison]/{warehouse_dir}/household_demographics]
 PushedFilters: [IsNotNull(hd_vehicle_count), IsNotNull(hd_dep_count), 
Or(EqualTo(hd_buy_potential,>10000         ),EqualTo(hd_buy_potential,Unknown   
     )), GreaterThan(hd_vehicle_count,0), IsNotNull(hd_demo_sk)]
 ReadSchema: 
struct<hd_demo_sk:int,hd_buy_potential:string,hd_dep_count:int,hd_vehicle_count:int>
 
-(20) ColumnarToRow
-Input [4]: [hd_demo_sk#14, hd_buy_potential#15, hd_dep_count#16, 
hd_vehicle_count#17]
+(15) ColumnarToRow [codegen id : 3]
+Input [4]: [hd_demo_sk#11, hd_buy_potential#12, hd_dep_count#13, 
hd_vehicle_count#14]
 
-(21) Filter
-Input [4]: [hd_demo_sk#14, hd_buy_potential#15, hd_dep_count#16, 
hd_vehicle_count#17]
-Condition : (((((isnotnull(hd_vehicle_count#17) AND 
isnotnull(hd_dep_count#16)) AND ((hd_buy_potential#15 = >10000         ) OR 
(hd_buy_potential#15 = Unknown        ))) AND (hd_vehicle_count#17 > 0)) AND 
((cast(hd_dep_count#16 as double) / cast(hd_vehicle_count#17 as double)) > 
1.0)) AND isnotnull(hd_demo_sk#14))
+(16) Filter [codegen id : 3]
+Input [4]: [hd_demo_sk#11, hd_buy_potential#12, hd_dep_count#13, 
hd_vehicle_count#14]
+Condition : (((((isnotnull(hd_vehicle_count#14) AND 
isnotnull(hd_dep_count#13)) AND ((hd_buy_potential#12 = >10000         ) OR 
(hd_buy_potential#12 = Unknown        ))) AND (hd_vehicle_count#14 > 0)) AND 
((cast(hd_dep_count#13 as double) / cast(hd_vehicle_count#14 as double)) > 
1.0)) AND isnotnull(hd_demo_sk#11))
 
-(22) Project
-Output [1]: [hd_demo_sk#14]
-Input [4]: [hd_demo_sk#14, hd_buy_potential#15, hd_dep_count#16, 
hd_vehicle_count#17]
+(17) Project [codegen id : 3]
+Output [1]: [hd_demo_sk#11]
+Input [4]: [hd_demo_sk#11, hd_buy_potential#12, hd_dep_count#13, 
hd_vehicle_count#14]
+
+(18) BroadcastExchange
+Input [1]: [hd_demo_sk#11]
+Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as 
bigint)),false), [id=#15]
 
-(23) BroadcastHashJoin [codegen id : 4]
+(19) BroadcastHashJoin [codegen id : 4]
 Left keys [1]: [ss_hdemo_sk#2]
-Right keys [1]: [hd_demo_sk#14]
+Right keys [1]: [hd_demo_sk#11]
 Join condition: None
 
-(24) Project [codegen id : 4]
+(20) Project [codegen id : 4]
 Output [2]: [ss_customer_sk#1, ss_ticket_number#4]
-Input [4]: [ss_customer_sk#1, ss_hdemo_sk#2, ss_ticket_number#4, hd_demo_sk#14]
+Input [4]: [ss_customer_sk#1, ss_hdemo_sk#2, ss_ticket_number#4, hd_demo_sk#11]
 
-(25) HashAggregate [codegen id : 4]
+(21) HashAggregate [codegen id : 4]
 Input [2]: [ss_customer_sk#1, ss_ticket_number#4]
 Keys [2]: [ss_ticket_number#4, ss_customer_sk#1]
 Functions [1]: [partial_count(1)]
-Aggregate Attributes [1]: [count#18]
-Results [3]: [ss_ticket_number#4, ss_customer_sk#1, count#19]
+Aggregate Attributes [1]: [count#16]
+Results [3]: [ss_ticket_number#4, ss_customer_sk#1, count#17]
 
-(26) Exchange
-Input [3]: [ss_ticket_number#4, ss_customer_sk#1, count#19]
-Arguments: hashpartitioning(ss_ticket_number#4, ss_customer_sk#1, 5), 
ENSURE_REQUIREMENTS, [id=#20]
+(22) Exchange
+Input [3]: [ss_ticket_number#4, ss_customer_sk#1, count#17]
+Arguments: hashpartitioning(ss_ticket_number#4, ss_customer_sk#1, 5), 
ENSURE_REQUIREMENTS, [id=#18]
 
-(27) HashAggregate [codegen id : 5]
-Input [3]: [ss_ticket_number#4, ss_customer_sk#1, count#19]
+(23) HashAggregate [codegen id : 5]
+Input [3]: [ss_ticket_number#4, ss_customer_sk#1, count#17]
 Keys [2]: [ss_ticket_number#4, ss_customer_sk#1]
 Functions [1]: [count(1)]
-Aggregate Attributes [1]: [count(1)#21]
-Results [3]: [ss_ticket_number#4, ss_customer_sk#1, count(1)#21 AS cnt#22]
+Aggregate Attributes [1]: [count(1)#19]
+Results [3]: [ss_ticket_number#4, ss_customer_sk#1, count(1)#19 AS cnt#20]
 
-(28) Filter [codegen id : 5]
-Input [3]: [ss_ticket_number#4, ss_customer_sk#1, cnt#22]
-Condition : ((cnt#22 >= 1) AND (cnt#22 <= 5))
+(24) Filter [codegen id : 5]
+Input [3]: [ss_ticket_number#4, ss_customer_sk#1, cnt#20]
+Condition : ((cnt#20 >= 1) AND (cnt#20 <= 5))
 
-(29) BroadcastExchange
-Input [3]: [ss_ticket_number#4, ss_customer_sk#1, cnt#22]
-Arguments: HashedRelationBroadcastMode(List(cast(input[1, int, true] as 
bigint)),false), [id=#23]
+(25) BroadcastExchange
+Input [3]: [ss_ticket_number#4, ss_customer_sk#1, cnt#20]
+Arguments: HashedRelationBroadcastMode(List(cast(input[1, int, true] as 
bigint)),false), [id=#21]
 
-(30) Scan parquet default.customer
-Output [5]: [c_customer_sk#24, c_salutation#25, c_first_name#26, 
c_last_name#27, c_preferred_cust_flag#28]
+(26) Scan parquet default.customer
+Output [5]: [c_customer_sk#22, c_salutation#23, c_first_name#24, 
c_last_name#25, c_preferred_cust_flag#26]
 Batched: true
 Location [not included in comparison]/{warehouse_dir}/customer]
 PushedFilters: [IsNotNull(c_customer_sk)]
 ReadSchema: 
struct<c_customer_sk:int,c_salutation:string,c_first_name:string,c_last_name:string,c_preferred_cust_flag:string>
 
-(31) ColumnarToRow
-Input [5]: [c_customer_sk#24, c_salutation#25, c_first_name#26, 
c_last_name#27, c_preferred_cust_flag#28]
+(27) ColumnarToRow
+Input [5]: [c_customer_sk#22, c_salutation#23, c_first_name#24, 
c_last_name#25, c_preferred_cust_flag#26]
 
-(32) Filter
-Input [5]: [c_customer_sk#24, c_salutation#25, c_first_name#26, 
c_last_name#27, c_preferred_cust_flag#28]
-Condition : isnotnull(c_customer_sk#24)
+(28) Filter
+Input [5]: [c_customer_sk#22, c_salutation#23, c_first_name#24, 
c_last_name#25, c_preferred_cust_flag#26]
+Condition : isnotnull(c_customer_sk#22)
 
-(33) BroadcastHashJoin [codegen id : 6]
+(29) BroadcastHashJoin [codegen id : 6]
 Left keys [1]: [ss_customer_sk#1]
-Right keys [1]: [c_customer_sk#24]
+Right keys [1]: [c_customer_sk#22]
 Join condition: None
 
-(34) Project [codegen id : 6]
-Output [6]: [c_last_name#27, c_first_name#26, c_salutation#25, 
c_preferred_cust_flag#28, ss_ticket_number#4, cnt#22]
-Input [8]: [ss_ticket_number#4, ss_customer_sk#1, cnt#22, c_customer_sk#24, 
c_salutation#25, c_first_name#26, c_last_name#27, c_preferred_cust_flag#28]
+(30) Project [codegen id : 6]
+Output [6]: [c_last_name#25, c_first_name#24, c_salutation#23, 
c_preferred_cust_flag#26, ss_ticket_number#4, cnt#20]
+Input [8]: [ss_ticket_number#4, ss_customer_sk#1, cnt#20, c_customer_sk#22, 
c_salutation#23, c_first_name#24, c_last_name#25, c_preferred_cust_flag#26]
+
+(31) Exchange
+Input [6]: [c_last_name#25, c_first_name#24, c_salutation#23, 
c_preferred_cust_flag#26, ss_ticket_number#4, cnt#20]
+Arguments: rangepartitioning(cnt#20 DESC NULLS LAST, 5), ENSURE_REQUIREMENTS, 
[id=#27]
+
+(32) Sort [codegen id : 7]
+Input [6]: [c_last_name#25, c_first_name#24, c_salutation#23, 
c_preferred_cust_flag#26, ss_ticket_number#4, cnt#20]
+Arguments: [cnt#20 DESC NULLS LAST], true, 0
+
+===== Subqueries =====
+
+Subquery:1 Hosting operator id = 1 Hosting Expression = ss_sold_date_sk#5 IN 
dynamicpruning#6
+BroadcastExchange (37)
++- * Project (36)
+   +- * Filter (35)
+      +- * ColumnarToRow (34)
+         +- Scan parquet default.date_dim (33)
+
+
+(33) Scan parquet default.date_dim
+Output [3]: [d_date_sk#7, d_year#28, d_dom#29]
+Batched: true
+Location [not included in comparison]/{warehouse_dir}/date_dim]
+PushedFilters: [IsNotNull(d_dom), GreaterThanOrEqual(d_dom,1), 
LessThanOrEqual(d_dom,2), In(d_year, [1998,1999,2000]), In(d_date_sk, 
[2450815,2450816,2450846,2450847,2450874,2450875,2450905,2450906,2450935,2450936,2450966,2450967,2450996,2450997,2451027,2451028,2451058,2451059,2451088,2451089,2451119,2451120,2451149,2451150,2451180,2451181,2451211,2451212,2451239,2451240,2451270,2451271,2451300,2451301,2451331,2451332,2451361,2451362,2451392,2451393,2451423,2451424,2451453,2451454,245148
 [...]
+ReadSchema: struct<d_date_sk:int,d_year:int,d_dom:int>
+
+(34) ColumnarToRow [codegen id : 1]
+Input [3]: [d_date_sk#7, d_year#28, d_dom#29]
+
+(35) Filter [codegen id : 1]
+Input [3]: [d_date_sk#7, d_year#28, d_dom#29]
+Condition : (((((isnotnull(d_dom#29) AND (d_dom#29 >= 1)) AND (d_dom#29 <= 2)) 
AND d_year#28 IN (1998,1999,2000)) AND d_date_sk#7 INSET 2450815, 2450816, 
2450846, 2450847, 2450874, 2450875, 2450905, 2450906, 2450935, 2450936, 
2450966, 2450967, 2450996, 2450997, 2451027, 2451028, 2451058, 2451059, 
2451088, 2451089, 2451119, 2451120, 2451149, 2451150, 2451180, 2451181, 
2451211, 2451212, 2451239, 2451240, 2451270, 2451271, 2451300, 2451301, 
2451331, 2451332, 2451361, 2451362, 2451392, 24513 [...]
+
+(36) Project [codegen id : 1]
+Output [1]: [d_date_sk#7]
+Input [3]: [d_date_sk#7, d_year#28, d_dom#29]
 
-(35) Exchange
-Input [6]: [c_last_name#27, c_first_name#26, c_salutation#25, 
c_preferred_cust_flag#28, ss_ticket_number#4, cnt#22]
-Arguments: rangepartitioning(cnt#22 DESC NULLS LAST, 5), ENSURE_REQUIREMENTS, 
[id=#29]
+(37) BroadcastExchange
+Input [1]: [d_date_sk#7]
+Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as 
bigint)),false), [id=#30]
 
-(36) Sort [codegen id : 7]
-Input [6]: [c_last_name#27, c_first_name#26, c_salutation#25, 
c_preferred_cust_flag#28, ss_ticket_number#4, cnt#22]
-Arguments: [cnt#22 DESC NULLS LAST], true, 0
 
diff --git 
a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-modified/q73.sf100/simplified.txt
 
b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-modified/q73.sf100/simplified.txt
index 91deecb..025e261 100644
--- 
a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-modified/q73.sf100/simplified.txt
+++ 
b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-modified/q73.sf100/simplified.txt
@@ -16,38 +16,40 @@ WholeStageCodegen (7)
                               HashAggregate [ss_ticket_number,ss_customer_sk] 
[count,count]
                                 Project [ss_customer_sk,ss_ticket_number]
                                   BroadcastHashJoin [ss_hdemo_sk,hd_demo_sk]
-                                    InputAdapter
-                                      BroadcastExchange #4
-                                        WholeStageCodegen (3)
-                                          Project 
[ss_customer_sk,ss_hdemo_sk,ss_ticket_number]
-                                            BroadcastHashJoin 
[ss_store_sk,s_store_sk]
-                                              InputAdapter
-                                                BroadcastExchange #5
-                                                  WholeStageCodegen (2)
-                                                    Project 
[ss_customer_sk,ss_hdemo_sk,ss_store_sk,ss_ticket_number]
-                                                      BroadcastHashJoin 
[ss_sold_date_sk,d_date_sk]
-                                                        InputAdapter
-                                                          BroadcastExchange #6
-                                                            WholeStageCodegen 
(1)
-                                                              Filter 
[ss_store_sk,ss_hdemo_sk,ss_customer_sk]
-                                                                ColumnarToRow
-                                                                  InputAdapter
-                                                                    Scan 
parquet default.store_sales 
[ss_customer_sk,ss_hdemo_sk,ss_store_sk,ss_ticket_number,ss_sold_date_sk]
-                                                        Project [d_date_sk]
-                                                          Filter 
[d_dom,d_year,d_date_sk]
-                                                            ColumnarToRow
-                                                              InputAdapter
-                                                                Scan parquet 
default.date_dim [d_date_sk,d_year,d_dom]
+                                    Project 
[ss_customer_sk,ss_hdemo_sk,ss_ticket_number]
+                                      BroadcastHashJoin 
[ss_store_sk,s_store_sk]
+                                        Project 
[ss_customer_sk,ss_hdemo_sk,ss_store_sk,ss_ticket_number]
+                                          BroadcastHashJoin 
[ss_sold_date_sk,d_date_sk]
+                                            Filter 
[ss_store_sk,ss_hdemo_sk,ss_customer_sk]
+                                              ColumnarToRow
+                                                InputAdapter
+                                                  Scan parquet 
default.store_sales 
[ss_customer_sk,ss_hdemo_sk,ss_store_sk,ss_ticket_number,ss_sold_date_sk]
+                                                    SubqueryBroadcast 
[d_date_sk] #1
+                                                      BroadcastExchange #4
+                                                        WholeStageCodegen (1)
+                                                          Project [d_date_sk]
+                                                            Filter 
[d_dom,d_year,d_date_sk]
+                                                              ColumnarToRow
+                                                                InputAdapter
+                                                                  Scan parquet 
default.date_dim [d_date_sk,d_year,d_dom]
+                                            InputAdapter
+                                              ReusedExchange [d_date_sk] #4
+                                        InputAdapter
+                                          BroadcastExchange #5
+                                            WholeStageCodegen (2)
                                               Project [s_store_sk]
                                                 Filter [s_county,s_store_sk]
                                                   ColumnarToRow
                                                     InputAdapter
                                                       Scan parquet 
default.store [s_store_sk,s_county]
-                                    Project [hd_demo_sk]
-                                      Filter 
[hd_vehicle_count,hd_dep_count,hd_buy_potential,hd_demo_sk]
-                                        ColumnarToRow
-                                          InputAdapter
-                                            Scan parquet 
default.household_demographics 
[hd_demo_sk,hd_buy_potential,hd_dep_count,hd_vehicle_count]
+                                    InputAdapter
+                                      BroadcastExchange #6
+                                        WholeStageCodegen (3)
+                                          Project [hd_demo_sk]
+                                            Filter 
[hd_vehicle_count,hd_dep_count,hd_buy_potential,hd_demo_sk]
+                                              ColumnarToRow
+                                                InputAdapter
+                                                  Scan parquet 
default.household_demographics 
[hd_demo_sk,hd_buy_potential,hd_dep_count,hd_vehicle_count]
               Filter [c_customer_sk]
                 ColumnarToRow
                   InputAdapter

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to