This is an automated email from the ASF dual-hosted git repository.

ulyssesyou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new b8cb9d552aa [SPARK-43402][SQL] FileSourceScanExec supports push down 
data filter with scalar subquery
b8cb9d552aa is described below

commit b8cb9d552aaaa6d6d8211555d3d00fff1e394015
Author: ulysses-you <ulyssesyo...@gmail.com>
AuthorDate: Mon Jul 31 10:14:58 2023 +0800

    [SPARK-43402][SQL] FileSourceScanExec supports push down data filter with 
scalar subquery
    
    ### What changes were proposed in this pull request?
    
    Scalar subquery can be pushed down as data filter at runtime, since we 
always execute subquery first. Ideally, we can rewrite `ScalarSubquery` to 
`Literal` before pushing down filter. The main issue before we do not support 
that is `ReuseSubquery` is ineffective, see 
https://github.com/apache/spark/pull/22518. It is not a issue now.
    
    For example:
    ```sql
    SELECT * FROM t1 WHERE c1 > (SELECT min(c2) FROM t2)
    ```
    
    ### Why are the changes needed?
    
    Improve peformance if data filter have scalar subquery.
    
    ### Does this PR introduce _any_ user-facing change?
    
    no
    
    ### How was this patch tested?
    
    add test
    
    Closes #41088 from ulysses-you/SPARK-43402.
    
    Authored-by: ulysses-you <ulyssesyo...@gmail.com>
    Signed-off-by: Xiduo You <ulysses...@apache.org>
---
 .../spark/sql/execution/DataSourceScanExec.scala   | 24 +++++++--
 .../execution/datasources/FileSourceStrategy.scala | 18 ++++---
 .../org/apache/spark/sql/execution/subquery.scala  |  6 ++-
 .../sql-tests/results/explain-aqe.sql.out          | 58 ++++++++++++++++++++++
 .../sql-tests/results/explain-cbo.sql.out          | 12 ++++-
 .../resources/sql-tests/results/explain.sql.out    | 22 +++++---
 .../approved-plans-v1_4/q14b.sf100/explain.txt     | 22 ++++----
 .../approved-plans-v1_4/q14b.sf100/simplified.txt  | 30 +++++------
 .../approved-plans-v1_4/q14b/explain.txt           | 22 ++++----
 .../approved-plans-v1_4/q14b/simplified.txt        | 30 +++++------
 .../approved-plans-v1_4/q54.sf100/explain.txt      | 10 ++--
 .../approved-plans-v1_4/q54.sf100/simplified.txt   | 50 ++++++++++---------
 .../approved-plans-v1_4/q54/explain.txt            | 10 ++--
 .../approved-plans-v1_4/q54/simplified.txt         | 50 ++++++++++---------
 .../approved-plans-v1_4/q58.sf100/explain.txt      | 10 ++--
 .../approved-plans-v1_4/q58.sf100/simplified.txt   | 15 +++---
 .../approved-plans-v1_4/q58/explain.txt            | 10 ++--
 .../approved-plans-v1_4/q58/simplified.txt         | 15 +++---
 .../approved-plans-v1_4/q6.sf100/explain.txt       |  6 ++-
 .../approved-plans-v1_4/q6.sf100/simplified.txt    | 25 +++++-----
 .../approved-plans-v1_4/q6/explain.txt             |  6 ++-
 .../approved-plans-v1_4/q6/simplified.txt          | 25 +++++-----
 .../approved-plans-v2_7/q14.sf100/explain.txt      | 22 ++++----
 .../approved-plans-v2_7/q14.sf100/simplified.txt   | 30 +++++------
 .../approved-plans-v2_7/q14/explain.txt            | 22 ++++----
 .../approved-plans-v2_7/q14/simplified.txt         | 30 +++++------
 .../approved-plans-v2_7/q6.sf100/explain.txt       |  6 ++-
 .../approved-plans-v2_7/q6.sf100/simplified.txt    | 25 +++++-----
 .../approved-plans-v2_7/q6/explain.txt             |  6 ++-
 .../approved-plans-v2_7/q6/simplified.txt          | 25 +++++-----
 .../resources/tpch-plan-stability/q22/explain.txt  |  6 ++-
 .../tpch-plan-stability/q22/simplified.txt         | 25 +++++-----
 .../spark/sql/DynamicPartitionPruningSuite.scala   |  3 +-
 .../scala/org/apache/spark/sql/SubquerySuite.scala | 45 ++++++++++++-----
 34 files changed, 452 insertions(+), 269 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index 6375cdacaa0..a739fa40c71 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.plans.QueryPlan
 import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, 
Partitioning, UnknownPartitioning}
 import org.apache.spark.sql.catalyst.util.{truncatedString, CaseInsensitiveMap}
 import org.apache.spark.sql.errors.QueryExecutionErrors
+import org.apache.spark.sql.execution
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat 
=> ParquetSource}
 import org.apache.spark.sql.execution.datasources.v2.PushedDownOperators
@@ -370,8 +371,7 @@ trait FileSourceScanLike extends DataSourceScanExec {
     }
   }
 
-  @transient
-  protected lazy val pushedDownFilters = {
+  private def translatePushedDownFilters(dataFilters: Seq[Expression]): 
Seq[Filter] = {
     val supportNestedPredicatePushdown = 
DataSourceUtils.supportNestedPredicatePushdown(relation)
     // `dataFilters` should not include any constant metadata col filters
     // because the metadata struct has been flatted in FileSourceStrategy
@@ -383,6 +383,24 @@ trait FileSourceScanLike extends DataSourceScanExec {
     }).flatMap(DataSourceStrategy.translateFilter(_, 
supportNestedPredicatePushdown))
   }
 
+  @transient
+  protected lazy val pushedDownFilters: Seq[Filter] = 
translatePushedDownFilters(dataFilters)
+
+  @transient
+  protected lazy val dynamicallyPushedDownFilters: Seq[Filter] = {
+    if 
(dataFilters.exists(_.exists(_.isInstanceOf[execution.ScalarSubquery]))) {
+      // Replace scalar subquery to literal so that 
`DataSourceStrategy.translateFilter` can
+      // support translate it. The subquery must has been materialized since 
SparkPlan always
+      // execute subquery first.
+      val normalized = dataFilters.map(_.transform {
+        case scalarSubquery: execution.ScalarSubquery => 
scalarSubquery.toLiteral
+      })
+      translatePushedDownFilters(normalized)
+    } else {
+      pushedDownFilters
+    }
+  }
+
   override lazy val metadata: Map[String, String] = {
     def seqToString(seq: Seq[Any]) = seq.mkString("[", ", ", "]")
     val location = relation.location
@@ -543,7 +561,7 @@ case class FileSourceScanExec(
         dataSchema = relation.dataSchema,
         partitionSchema = relation.partitionSchema,
         requiredSchema = requiredSchema,
-        filters = pushedDownFilters,
+        filters = dynamicallyPushedDownFilters,
         options = options,
         hadoopConf = 
relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
index e4bf24ad88d..5673e12927c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
@@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.expressions
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.planning.ScanOperation
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.trees.TreePattern.{PLAN_EXPRESSION, 
SCALAR_SUBQUERY}
 import org.apache.spark.sql.catalyst.types.DataTypeUtils
 import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan}
 import org.apache.spark.sql.types.{DoubleType, FloatType, StructType}
@@ -171,13 +172,12 @@ object FileSourceStrategy extends Strategy with 
PredicateHelper with Logging {
       val partitionKeyFilters = 
DataSourceStrategy.getPushedDownFilters(partitionColumns,
         normalizedFilters)
 
-      // subquery expressions are filtered out because they can't be used to 
prune buckets or pushed
-      // down as data filters, yet they would be executed
-      val normalizedFiltersWithoutSubqueries =
-        normalizedFilters.filterNot(SubqueryExpression.hasSubquery)
-
       val bucketSpec: Option[BucketSpec] = fsRelation.bucketSpec
       val bucketSet = if (shouldPruneBuckets(bucketSpec)) {
+        // subquery expressions are filtered out because they can't be used to 
prune buckets
+        // as data filters, yet they would be executed
+        val normalizedFiltersWithoutSubqueries =
+          normalizedFilters.filterNot(SubqueryExpression.hasSubquery)
         genBucketSet(normalizedFiltersWithoutSubqueries, bucketSpec.get)
       } else {
         None
@@ -189,7 +189,13 @@ object FileSourceStrategy extends Strategy with 
PredicateHelper with Logging {
       // Partition keys are not available in the statistics of the files.
       // `dataColumns` might have partition columns, we need to filter them 
out.
       val dataColumnsWithoutPartitionCols = 
dataColumns.filterNot(partitionSet.contains)
-      val dataFilters = normalizedFiltersWithoutSubqueries.flatMap { f =>
+      // Scalar subquery can be pushed down as data filter at runtime, since 
we always
+      // execute subquery first.
+      // It has no meaning to push down bloom filter, so skip it.
+      val normalizedFiltersWithScalarSubqueries = normalizedFilters
+        .filterNot(e => e.containsPattern(PLAN_EXPRESSION) && 
!e.containsPattern(SCALAR_SUBQUERY))
+        .filterNot(_.isInstanceOf[BloomFilterMightContain])
+      val dataFilters = normalizedFiltersWithScalarSubqueries.flatMap { f =>
         if (f.references.intersect(partitionSet).nonEmpty) {
           extractPredicatesWithinOutputSet(f, 
AttributeSet(dataColumnsWithoutPartitionCols))
         } else {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala
index 656c50c8232..709402571ca 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala
@@ -100,8 +100,12 @@ case class ScalarSubquery(
   }
 
   override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
+    toLiteral.doGenCode(ctx, ev)
+  }
+
+  def toLiteral: Literal = {
     require(updated, s"$this has not finished")
-    Literal.create(result, dataType).doGenCode(ctx, ev)
+    Literal.create(result, dataType)
   }
 }
 
diff --git a/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out 
b/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out
index 3c2677c936f..44b2679f89d 100644
--- a/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out
@@ -505,6 +505,45 @@ Results [1]: [max(key#x)#x AS max(key)#x]
 Output [1]: [max(key)#x]
 Arguments: isFinalPlan=false
 
+Subquery:3 Hosting operator id = 4 Hosting Expression = Subquery subquery#x, 
[id=#x]
+AdaptiveSparkPlan (17)
++- HashAggregate (16)
+   +- Exchange (15)
+      +- HashAggregate (14)
+         +- Project (13)
+            +- Filter (12)
+               +- Scan parquet spark_catalog.default.explain_temp3 (11)
+
+
+Subquery:4 Hosting operator id = 1 Hosting Expression = Subquery subquery#x, 
[id=#x]
+AdaptiveSparkPlan (10)
++- HashAggregate (9)
+   +- Exchange (8)
+      +- HashAggregate (7)
+         +- Project (6)
+            +- Filter (5)
+               +- Scan parquet spark_catalog.default.explain_temp2 (4)
+
+
+Subquery:5 Hosting operator id = 5 Hosting Expression = Subquery subquery#x, 
[id=#x]
+AdaptiveSparkPlan (17)
++- HashAggregate (16)
+   +- Exchange (15)
+      +- HashAggregate (14)
+         +- Project (13)
+            +- Filter (12)
+               +- Scan parquet spark_catalog.default.explain_temp3 (11)
+
+
+Subquery:6 Hosting operator id = 4 Hosting Expression = Subquery subquery#x, 
[id=#x]
+AdaptiveSparkPlan (17)
++- HashAggregate (16)
+   +- Exchange (15)
+      +- HashAggregate (14)
+         +- Project (13)
+            +- Filter (12)
+               +- Scan parquet spark_catalog.default.explain_temp3 (11)
+
 
 -- !query
 EXPLAIN FORMATTED
@@ -636,6 +675,25 @@ Results [1]: [avg(key#x)#x AS avg(key)#x]
 Output [1]: [avg(key)#x]
 Arguments: isFinalPlan=false
 
+Subquery:3 Hosting operator id = 1 Hosting Expression = Subquery subquery#x, 
[id=#x]
+AdaptiveSparkPlan (10)
++- HashAggregate (9)
+   +- Exchange (8)
+      +- HashAggregate (7)
+         +- Project (6)
+            +- Filter (5)
+               +- Scan parquet spark_catalog.default.explain_temp2 (4)
+
+
+Subquery:4 Hosting operator id = 1 Hosting Expression = Subquery subquery#x, 
[id=#x]
+AdaptiveSparkPlan (17)
++- HashAggregate (16)
+   +- Exchange (15)
+      +- HashAggregate (14)
+         +- Project (13)
+            +- Filter (12)
+               +- Scan parquet spark_catalog.default.explain_temp3 (11)
+
 
 -- !query
 EXPLAIN FORMATTED
diff --git a/sql/core/src/test/resources/sql-tests/results/explain-cbo.sql.out 
b/sql/core/src/test/resources/sql-tests/results/explain-cbo.sql.out
index 7c2a954936f..634c6b93d87 100644
--- a/sql/core/src/test/resources/sql-tests/results/explain-cbo.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/explain-cbo.sql.out
@@ -74,7 +74,17 @@ AdaptiveSparkPlan isFinalPlan=false
       :                       +- Project [b#x]
       :                          +- Filter (isnotnull(a#x) AND (a#x < 100))
       :                             +- FileScan parquet 
spark_catalog.default.explain_temp1[a#x,b#x] Batched: true, DataFilters: 
[isnotnull(a#x), (a#x < 100)], Format: Parquet, Location [not included in 
comparison]/{warehouse_dir}/explain_temp1], PartitionFilters: [], 
PushedFilters: [IsNotNull(a), LessThan(a,100)], ReadSchema: struct<a:int,b:int>
-      +- FileScan parquet spark_catalog.default.explain_temp2[c#x,d#x] 
Batched: true, DataFilters: [isnotnull(d#x)], Format: Parquet, Location [not 
included in comparison]/{warehouse_dir}/explain_temp2], PartitionFilters: [], 
PushedFilters: [IsNotNull(d)], ReadSchema: struct<c:int,d:int>
+      +- FileScan parquet spark_catalog.default.explain_temp2[c#x,d#x] 
Batched: true, DataFilters: [isnotnull(d#x), (cast(d#x as bigint) > Subquery 
subquery#x, [id=#x])], Format: Parquet, Location [not included in 
comparison]/{warehouse_dir}/explain_temp2], PartitionFilters: [], 
PushedFilters: [IsNotNull(d)], ReadSchema: struct<c:int,d:int>
+            +- Subquery subquery#x, [id=#x]
+               +- AdaptiveSparkPlan isFinalPlan=false
+                  +- HashAggregate(keys=[], functions=[max(csales#xL)], 
output=[tpcds_cmax#xL])
+                     +- HashAggregate(keys=[], 
functions=[partial_max(csales#xL)], output=[max#xL])
+                        +- HashAggregate(keys=[], functions=[sum(b#x)], 
output=[csales#xL])
+                           +- Exchange SinglePartition, ENSURE_REQUIREMENTS, 
[plan_id=x]
+                              +- HashAggregate(keys=[], 
functions=[partial_sum(b#x)], output=[sum#xL])
+                                 +- Project [b#x]
+                                    +- Filter (isnotnull(a#x) AND (a#x < 100))
+                                       +- FileScan parquet 
spark_catalog.default.explain_temp1[a#x,b#x] Batched: true, DataFilters: 
[isnotnull(a#x), (a#x < 100)], Format: Parquet, Location [not included in 
comparison]/{warehouse_dir}/explain_temp1], PartitionFilters: [], 
PushedFilters: [IsNotNull(a), LessThan(a,100)], ReadSchema: struct<a:int,b:int>
 
 
 -- !query
diff --git a/sql/core/src/test/resources/sql-tests/results/explain.sql.out 
b/sql/core/src/test/resources/sql-tests/results/explain.sql.out
index f54c6c5e44f..0cd94abc9b3 100644
--- a/sql/core/src/test/resources/sql-tests/results/explain.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/explain.sql.out
@@ -413,11 +413,13 @@ Input [2]: [key#x, val#x]
 
 (3) Filter [codegen id : 1]
 Input [2]: [key#x, val#x]
-Condition : (((isnotnull(key#x) AND isnotnull(val#x)) AND (key#x = Subquery 
scalar-subquery#x, [id=#x])) AND (val#x > 3))
+Condition : (((isnotnull(key#x) AND isnotnull(val#x)) AND (key#x = 
ReusedSubquery Subquery scalar-subquery#x, [id=#x])) AND (val#x > 3))
 
 ===== Subqueries =====
 
-Subquery:1 Hosting operator id = 3 Hosting Expression = Subquery 
scalar-subquery#x, [id=#x]
+Subquery:1 Hosting operator id = 3 Hosting Expression = ReusedSubquery 
Subquery scalar-subquery#x, [id=#x]
+
+Subquery:2 Hosting operator id = 1 Hosting Expression = Subquery 
scalar-subquery#x, [id=#x]
 * HashAggregate (10)
 +- Exchange (9)
    +- * HashAggregate (8)
@@ -439,7 +441,7 @@ Input [2]: [key#x, val#x]
 
 (6) Filter [codegen id : 1]
 Input [2]: [key#x, val#x]
-Condition : (((isnotnull(key#x) AND isnotnull(val#x)) AND (key#x = Subquery 
scalar-subquery#x, [id=#x])) AND (val#x = 2))
+Condition : (((isnotnull(key#x) AND isnotnull(val#x)) AND (key#x = 
ReusedSubquery Subquery scalar-subquery#x, [id=#x])) AND (val#x = 2))
 
 (7) Project [codegen id : 1]
 Output [1]: [key#x]
@@ -463,7 +465,9 @@ Functions [1]: [max(key#x)]
 Aggregate Attributes [1]: [max(key#x)#x]
 Results [1]: [max(key#x)#x AS max(key)#x]
 
-Subquery:2 Hosting operator id = 6 Hosting Expression = Subquery 
scalar-subquery#x, [id=#x]
+Subquery:3 Hosting operator id = 6 Hosting Expression = ReusedSubquery 
Subquery scalar-subquery#x, [id=#x]
+
+Subquery:4 Hosting operator id = 4 Hosting Expression = Subquery 
scalar-subquery#x, [id=#x]
 * HashAggregate (17)
 +- Exchange (16)
    +- * HashAggregate (15)
@@ -541,11 +545,15 @@ Input [2]: [key#x, val#x]
 
 (3) Filter [codegen id : 1]
 Input [2]: [key#x, val#x]
-Condition : ((key#x = Subquery scalar-subquery#x, [id=#x]) OR (cast(key#x as 
double) = Subquery scalar-subquery#x, [id=#x]))
+Condition : ((key#x = ReusedSubquery Subquery scalar-subquery#x, [id=#x]) OR 
(cast(key#x as double) = ReusedSubquery Subquery scalar-subquery#x, [id=#x]))
 
 ===== Subqueries =====
 
-Subquery:1 Hosting operator id = 3 Hosting Expression = Subquery 
scalar-subquery#x, [id=#x]
+Subquery:1 Hosting operator id = 3 Hosting Expression = ReusedSubquery 
Subquery scalar-subquery#x, [id=#x]
+
+Subquery:2 Hosting operator id = 3 Hosting Expression = ReusedSubquery 
Subquery scalar-subquery#x, [id=#x]
+
+Subquery:3 Hosting operator id = 1 Hosting Expression = Subquery 
scalar-subquery#x, [id=#x]
 * HashAggregate (10)
 +- Exchange (9)
    +- * HashAggregate (8)
@@ -591,7 +599,7 @@ Functions [1]: [max(key#x)]
 Aggregate Attributes [1]: [max(key#x)#x]
 Results [1]: [max(key#x)#x AS max(key)#x]
 
-Subquery:2 Hosting operator id = 3 Hosting Expression = Subquery 
scalar-subquery#x, [id=#x]
+Subquery:4 Hosting operator id = 1 Hosting Expression = Subquery 
scalar-subquery#x, [id=#x]
 * HashAggregate (17)
 +- Exchange (16)
    +- * HashAggregate (15)
diff --git 
a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q14b.sf100/explain.txt
 
b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q14b.sf100/explain.txt
index 9af375dd69f..16bdfb10416 100644
--- 
a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q14b.sf100/explain.txt
+++ 
b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q14b.sf100/explain.txt
@@ -656,7 +656,7 @@ Input [2]: [d_date_sk#36, d_week_seq#100]
 
 (112) Filter [codegen id : 1]
 Input [2]: [d_date_sk#36, d_week_seq#100]
-Condition : ((isnotnull(d_week_seq#100) AND (d_week_seq#100 = Subquery 
scalar-subquery#101, [id=#102])) AND isnotnull(d_date_sk#36))
+Condition : ((isnotnull(d_week_seq#100) AND (d_week_seq#100 = ReusedSubquery 
Subquery scalar-subquery#101, [id=#102])) AND isnotnull(d_date_sk#36))
 
 (113) Project [codegen id : 1]
 Output [1]: [d_date_sk#36]
@@ -666,7 +666,9 @@ Input [2]: [d_date_sk#36, d_week_seq#100]
 Input [1]: [d_date_sk#36]
 Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as 
bigint)),false), [plan_id=15]
 
-Subquery:6 Hosting operator id = 112 Hosting Expression = Subquery 
scalar-subquery#101, [id=#102]
+Subquery:6 Hosting operator id = 112 Hosting Expression = ReusedSubquery 
Subquery scalar-subquery#101, [id=#102]
+
+Subquery:7 Hosting operator id = 110 Hosting Expression = Subquery 
scalar-subquery#101, [id=#102]
 * Project (118)
 +- * Filter (117)
    +- * ColumnarToRow (116)
@@ -691,7 +693,7 @@ Condition : (((((isnotnull(d_year#104) AND 
isnotnull(d_moy#105)) AND isnotnull(d
 Output [1]: [d_week_seq#103]
 Input [4]: [d_week_seq#103, d_year#104, d_moy#105, d_dom#106]
 
-Subquery:7 Hosting operator id = 7 Hosting Expression = ss_sold_date_sk#11 IN 
dynamicpruning#12
+Subquery:8 Hosting operator id = 7 Hosting Expression = ss_sold_date_sk#11 IN 
dynamicpruning#12
 BroadcastExchange (123)
 +- * Project (122)
    +- * Filter (121)
@@ -721,13 +723,13 @@ Input [2]: [d_date_sk#13, d_year#107]
 Input [1]: [d_date_sk#13]
 Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as 
bigint)),false), [plan_id=16]
 
-Subquery:8 Hosting operator id = 18 Hosting Expression = cs_sold_date_sk#19 IN 
dynamicpruning#12
+Subquery:9 Hosting operator id = 18 Hosting Expression = cs_sold_date_sk#19 IN 
dynamicpruning#12
 
-Subquery:9 Hosting operator id = 41 Hosting Expression = ws_sold_date_sk#29 IN 
dynamicpruning#12
+Subquery:10 Hosting operator id = 41 Hosting Expression = ws_sold_date_sk#29 
IN dynamicpruning#12
 
-Subquery:10 Hosting operator id = 87 Hosting Expression = ReusedSubquery 
Subquery scalar-subquery#52, [id=#53]
+Subquery:11 Hosting operator id = 87 Hosting Expression = ReusedSubquery 
Subquery scalar-subquery#52, [id=#53]
 
-Subquery:11 Hosting operator id = 73 Hosting Expression = ss_sold_date_sk#57 
IN dynamicpruning#58
+Subquery:12 Hosting operator id = 73 Hosting Expression = ss_sold_date_sk#57 
IN dynamicpruning#58
 BroadcastExchange (128)
 +- * Project (127)
    +- * Filter (126)
@@ -747,7 +749,7 @@ Input [2]: [d_date_sk#60, d_week_seq#108]
 
 (126) Filter [codegen id : 1]
 Input [2]: [d_date_sk#60, d_week_seq#108]
-Condition : ((isnotnull(d_week_seq#108) AND (d_week_seq#108 = Subquery 
scalar-subquery#109, [id=#110])) AND isnotnull(d_date_sk#60))
+Condition : ((isnotnull(d_week_seq#108) AND (d_week_seq#108 = ReusedSubquery 
Subquery scalar-subquery#109, [id=#110])) AND isnotnull(d_date_sk#60))
 
 (127) Project [codegen id : 1]
 Output [1]: [d_date_sk#60]
@@ -757,7 +759,9 @@ Input [2]: [d_date_sk#60, d_week_seq#108]
 Input [1]: [d_date_sk#60]
 Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as 
bigint)),false), [plan_id=17]
 
-Subquery:12 Hosting operator id = 126 Hosting Expression = Subquery 
scalar-subquery#109, [id=#110]
+Subquery:13 Hosting operator id = 126 Hosting Expression = ReusedSubquery 
Subquery scalar-subquery#109, [id=#110]
+
+Subquery:14 Hosting operator id = 124 Hosting Expression = Subquery 
scalar-subquery#109, [id=#110]
 * Project (132)
 +- * Filter (131)
    +- * ColumnarToRow (130)
diff --git 
a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q14b.sf100/simplified.txt
 
b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q14b.sf100/simplified.txt
index edd34864986..4db035858b2 100644
--- 
a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q14b.sf100/simplified.txt
+++ 
b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q14b.sf100/simplified.txt
@@ -57,16 +57,17 @@ TakeOrderedAndProject 
[i_brand_id,i_class_id,i_category_id,channel,sales,number_
                                         WholeStageCodegen (1)
                                           Project [d_date_sk]
                                             Filter [d_week_seq,d_date_sk]
-                                              Subquery #2
-                                                WholeStageCodegen (1)
-                                                  Project [d_week_seq]
-                                                    Filter [d_year,d_moy,d_dom]
-                                                      ColumnarToRow
-                                                        InputAdapter
-                                                          Scan parquet 
spark_catalog.default.date_dim [d_week_seq,d_year,d_moy,d_dom]
+                                              ReusedSubquery [d_week_seq] #2
                                               ColumnarToRow
                                                 InputAdapter
                                                   Scan parquet 
spark_catalog.default.date_dim [d_date_sk,d_week_seq]
+                                                    Subquery #2
+                                                      WholeStageCodegen (1)
+                                                        Project [d_week_seq]
+                                                          Filter 
[d_year,d_moy,d_dom]
+                                                            ColumnarToRow
+                                                              InputAdapter
+                                                                Scan parquet 
spark_catalog.default.date_dim [d_week_seq,d_year,d_moy,d_dom]
                             InputAdapter
                               BroadcastExchange #3
                                 WholeStageCodegen (17)
@@ -202,16 +203,17 @@ TakeOrderedAndProject 
[i_brand_id,i_class_id,i_category_id,channel,sales,number_
                                               WholeStageCodegen (1)
                                                 Project [d_date_sk]
                                                   Filter [d_week_seq,d_date_sk]
-                                                    Subquery #6
-                                                      WholeStageCodegen (1)
-                                                        Project [d_week_seq]
-                                                          Filter 
[d_year,d_moy,d_dom]
-                                                            ColumnarToRow
-                                                              InputAdapter
-                                                                Scan parquet 
spark_catalog.default.date_dim [d_week_seq,d_year,d_moy,d_dom]
+                                                    ReusedSubquery 
[d_week_seq] #6
                                                     ColumnarToRow
                                                       InputAdapter
                                                         Scan parquet 
spark_catalog.default.date_dim [d_date_sk,d_week_seq]
+                                                          Subquery #6
+                                                            WholeStageCodegen 
(1)
+                                                              Project 
[d_week_seq]
+                                                                Filter 
[d_year,d_moy,d_dom]
+                                                                  ColumnarToRow
+                                                                    
InputAdapter
+                                                                      Scan 
parquet spark_catalog.default.date_dim [d_week_seq,d_year,d_moy,d_dom]
                                   InputAdapter
                                     ReusedExchange [ss_item_sk] #3
                                 InputAdapter
diff --git 
a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q14b/explain.txt
 
b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q14b/explain.txt
index 4239018abff..cc8b88f3adc 100644
--- 
a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q14b/explain.txt
+++ 
b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q14b/explain.txt
@@ -626,7 +626,7 @@ Input [2]: [d_date_sk#40, d_week_seq#100]
 
 (106) Filter [codegen id : 1]
 Input [2]: [d_date_sk#40, d_week_seq#100]
-Condition : ((isnotnull(d_week_seq#100) AND (d_week_seq#100 = Subquery 
scalar-subquery#101, [id=#102])) AND isnotnull(d_date_sk#40))
+Condition : ((isnotnull(d_week_seq#100) AND (d_week_seq#100 = ReusedSubquery 
Subquery scalar-subquery#101, [id=#102])) AND isnotnull(d_date_sk#40))
 
 (107) Project [codegen id : 1]
 Output [1]: [d_date_sk#40]
@@ -636,7 +636,9 @@ Input [2]: [d_date_sk#40, d_week_seq#100]
 Input [1]: [d_date_sk#40]
 Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as 
bigint)),false), [plan_id=13]
 
-Subquery:6 Hosting operator id = 106 Hosting Expression = Subquery 
scalar-subquery#101, [id=#102]
+Subquery:6 Hosting operator id = 106 Hosting Expression = ReusedSubquery 
Subquery scalar-subquery#101, [id=#102]
+
+Subquery:7 Hosting operator id = 104 Hosting Expression = Subquery 
scalar-subquery#101, [id=#102]
 * Project (112)
 +- * Filter (111)
    +- * ColumnarToRow (110)
@@ -661,7 +663,7 @@ Condition : (((((isnotnull(d_year#104) AND 
isnotnull(d_moy#105)) AND isnotnull(d
 Output [1]: [d_week_seq#103]
 Input [4]: [d_week_seq#103, d_year#104, d_moy#105, d_dom#106]
 
-Subquery:7 Hosting operator id = 7 Hosting Expression = ss_sold_date_sk#11 IN 
dynamicpruning#12
+Subquery:8 Hosting operator id = 7 Hosting Expression = ss_sold_date_sk#11 IN 
dynamicpruning#12
 BroadcastExchange (117)
 +- * Project (116)
    +- * Filter (115)
@@ -691,13 +693,13 @@ Input [2]: [d_date_sk#24, d_year#107]
 Input [1]: [d_date_sk#24]
 Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as 
bigint)),false), [plan_id=14]
 
-Subquery:8 Hosting operator id = 13 Hosting Expression = cs_sold_date_sk#18 IN 
dynamicpruning#12
+Subquery:9 Hosting operator id = 13 Hosting Expression = cs_sold_date_sk#18 IN 
dynamicpruning#12
 
-Subquery:9 Hosting operator id = 36 Hosting Expression = ws_sold_date_sk#29 IN 
dynamicpruning#12
+Subquery:10 Hosting operator id = 36 Hosting Expression = ws_sold_date_sk#29 
IN dynamicpruning#12
 
-Subquery:10 Hosting operator id = 81 Hosting Expression = ReusedSubquery 
Subquery scalar-subquery#52, [id=#53]
+Subquery:11 Hosting operator id = 81 Hosting Expression = ReusedSubquery 
Subquery scalar-subquery#52, [id=#53]
 
-Subquery:11 Hosting operator id = 67 Hosting Expression = ss_sold_date_sk#57 
IN dynamicpruning#58
+Subquery:12 Hosting operator id = 67 Hosting Expression = ss_sold_date_sk#57 
IN dynamicpruning#58
 BroadcastExchange (122)
 +- * Project (121)
    +- * Filter (120)
@@ -717,7 +719,7 @@ Input [2]: [d_date_sk#64, d_week_seq#108]
 
 (120) Filter [codegen id : 1]
 Input [2]: [d_date_sk#64, d_week_seq#108]
-Condition : ((isnotnull(d_week_seq#108) AND (d_week_seq#108 = Subquery 
scalar-subquery#109, [id=#110])) AND isnotnull(d_date_sk#64))
+Condition : ((isnotnull(d_week_seq#108) AND (d_week_seq#108 = ReusedSubquery 
Subquery scalar-subquery#109, [id=#110])) AND isnotnull(d_date_sk#64))
 
 (121) Project [codegen id : 1]
 Output [1]: [d_date_sk#64]
@@ -727,7 +729,9 @@ Input [2]: [d_date_sk#64, d_week_seq#108]
 Input [1]: [d_date_sk#64]
 Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as 
bigint)),false), [plan_id=15]
 
-Subquery:12 Hosting operator id = 120 Hosting Expression = Subquery 
scalar-subquery#109, [id=#110]
+Subquery:13 Hosting operator id = 120 Hosting Expression = ReusedSubquery 
Subquery scalar-subquery#109, [id=#110]
+
+Subquery:14 Hosting operator id = 118 Hosting Expression = Subquery 
scalar-subquery#109, [id=#110]
 * Project (126)
 +- * Filter (125)
    +- * ColumnarToRow (124)
diff --git 
a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q14b/simplified.txt
 
b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q14b/simplified.txt
index 8d8dcccd5d7..b103e79d118 100644
--- 
a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q14b/simplified.txt
+++ 
b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q14b/simplified.txt
@@ -57,16 +57,17 @@ TakeOrderedAndProject 
[i_brand_id,i_class_id,i_category_id,channel,sales,number_
                                         WholeStageCodegen (1)
                                           Project [d_date_sk]
                                             Filter [d_week_seq,d_date_sk]
-                                              Subquery #2
-                                                WholeStageCodegen (1)
-                                                  Project [d_week_seq]
-                                                    Filter [d_year,d_moy,d_dom]
-                                                      ColumnarToRow
-                                                        InputAdapter
-                                                          Scan parquet 
spark_catalog.default.date_dim [d_week_seq,d_year,d_moy,d_dom]
+                                              ReusedSubquery [d_week_seq] #2
                                               ColumnarToRow
                                                 InputAdapter
                                                   Scan parquet 
spark_catalog.default.date_dim [d_date_sk,d_week_seq]
+                                                    Subquery #2
+                                                      WholeStageCodegen (1)
+                                                        Project [d_week_seq]
+                                                          Filter 
[d_year,d_moy,d_dom]
+                                                            ColumnarToRow
+                                                              InputAdapter
+                                                                Scan parquet 
spark_catalog.default.date_dim [d_week_seq,d_year,d_moy,d_dom]
                             InputAdapter
                               BroadcastExchange #3
                                 WholeStageCodegen (11)
@@ -184,16 +185,17 @@ TakeOrderedAndProject 
[i_brand_id,i_class_id,i_category_id,channel,sales,number_
                                               WholeStageCodegen (1)
                                                 Project [d_date_sk]
                                                   Filter [d_week_seq,d_date_sk]
-                                                    Subquery #6
-                                                      WholeStageCodegen (1)
-                                                        Project [d_week_seq]
-                                                          Filter 
[d_year,d_moy,d_dom]
-                                                            ColumnarToRow
-                                                              InputAdapter
-                                                                Scan parquet 
spark_catalog.default.date_dim [d_week_seq,d_year,d_moy,d_dom]
+                                                    ReusedSubquery 
[d_week_seq] #6
                                                     ColumnarToRow
                                                       InputAdapter
                                                         Scan parquet 
spark_catalog.default.date_dim [d_date_sk,d_week_seq]
+                                                          Subquery #6
+                                                            WholeStageCodegen 
(1)
+                                                              Project 
[d_week_seq]
+                                                                Filter 
[d_year,d_moy,d_dom]
+                                                                  ColumnarToRow
+                                                                    
InputAdapter
+                                                                      Scan 
parquet spark_catalog.default.date_dim [d_week_seq,d_year,d_moy,d_dom]
                                   InputAdapter
                                     ReusedExchange [ss_item_sk] #3
                                 InputAdapter
diff --git 
a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q54.sf100/explain.txt
 
b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q54.sf100/explain.txt
index d7e41c7331b..19643cccab6 100644
--- 
a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q54.sf100/explain.txt
+++ 
b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q54.sf100/explain.txt
@@ -395,7 +395,7 @@ Input [2]: [d_date_sk#29, d_month_seq#41]
 
 (67) Filter [codegen id : 1]
 Input [2]: [d_date_sk#29, d_month_seq#41]
-Condition : (((isnotnull(d_month_seq#41) AND (d_month_seq#41 >= Subquery 
scalar-subquery#42, [id=#43])) AND (d_month_seq#41 <= Subquery 
scalar-subquery#44, [id=#45])) AND isnotnull(d_date_sk#29))
+Condition : (((isnotnull(d_month_seq#41) AND (d_month_seq#41 >= ReusedSubquery 
Subquery scalar-subquery#42, [id=#43])) AND (d_month_seq#41 <= ReusedSubquery 
Subquery scalar-subquery#44, [id=#45])) AND isnotnull(d_date_sk#29))
 
 (68) Project [codegen id : 1]
 Output [1]: [d_date_sk#29]
@@ -405,7 +405,11 @@ Input [2]: [d_date_sk#29, d_month_seq#41]
 Input [1]: [d_date_sk#29]
 Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as 
bigint)),false), [plan_id=9]
 
-Subquery:4 Hosting operator id = 67 Hosting Expression = Subquery 
scalar-subquery#42, [id=#43]
+Subquery:4 Hosting operator id = 67 Hosting Expression = ReusedSubquery 
Subquery scalar-subquery#42, [id=#43]
+
+Subquery:5 Hosting operator id = 67 Hosting Expression = ReusedSubquery 
Subquery scalar-subquery#44, [id=#45]
+
+Subquery:6 Hosting operator id = 65 Hosting Expression = Subquery 
scalar-subquery#42, [id=#43]
 * HashAggregate (76)
 +- Exchange (75)
    +- * HashAggregate (74)
@@ -451,7 +455,7 @@ Functions: []
 Aggregate Attributes: []
 Results [1]: [(d_month_seq + 1)#49]
 
-Subquery:5 Hosting operator id = 67 Hosting Expression = Subquery 
scalar-subquery#44, [id=#45]
+Subquery:7 Hosting operator id = 65 Hosting Expression = Subquery 
scalar-subquery#44, [id=#45]
 * HashAggregate (83)
 +- Exchange (82)
    +- * HashAggregate (81)
diff --git 
a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q54.sf100/simplified.txt
 
b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q54.sf100/simplified.txt
index a16d053d2c9..18921ceebf2 100644
--- 
a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q54.sf100/simplified.txt
+++ 
b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q54.sf100/simplified.txt
@@ -104,32 +104,34 @@ TakeOrderedAndProject [segment,num_customers,segment_base]
                                                   WholeStageCodegen (1)
                                                     Project [d_date_sk]
                                                       Filter 
[d_month_seq,d_date_sk]
-                                                        Subquery #3
-                                                          WholeStageCodegen (2)
-                                                            HashAggregate 
[(d_month_seq + 1)]
-                                                              InputAdapter
-                                                                Exchange 
[(d_month_seq + 1)] #10
-                                                                  
WholeStageCodegen (1)
-                                                                    
HashAggregate [(d_month_seq + 1)]
-                                                                      Project 
[d_month_seq]
-                                                                        Filter 
[d_year,d_moy]
-                                                                          
ColumnarToRow
-                                                                            
InputAdapter
-                                                                              
Scan parquet spark_catalog.default.date_dim [d_month_seq,d_year,d_moy]
-                                                        Subquery #4
-                                                          WholeStageCodegen (2)
-                                                            HashAggregate 
[(d_month_seq + 3)]
-                                                              InputAdapter
-                                                                Exchange 
[(d_month_seq + 3)] #11
-                                                                  
WholeStageCodegen (1)
-                                                                    
HashAggregate [(d_month_seq + 3)]
-                                                                      Project 
[d_month_seq]
-                                                                        Filter 
[d_year,d_moy]
-                                                                          
ColumnarToRow
-                                                                            
InputAdapter
-                                                                              
Scan parquet spark_catalog.default.date_dim [d_month_seq,d_year,d_moy]
+                                                        ReusedSubquery 
[(d_month_seq + 1)] #3
+                                                        ReusedSubquery 
[(d_month_seq + 3)] #4
                                                         ColumnarToRow
                                                           InputAdapter
                                                             Scan parquet 
spark_catalog.default.date_dim [d_date_sk,d_month_seq]
+                                                              Subquery #3
+                                                                
WholeStageCodegen (2)
+                                                                  
HashAggregate [(d_month_seq + 1)]
+                                                                    
InputAdapter
+                                                                      Exchange 
[(d_month_seq + 1)] #10
+                                                                        
WholeStageCodegen (1)
+                                                                          
HashAggregate [(d_month_seq + 1)]
+                                                                            
Project [d_month_seq]
+                                                                              
Filter [d_year,d_moy]
+                                                                               
 ColumnarToRow
+                                                                               
   InputAdapter
+                                                                               
     Scan parquet spark_catalog.default.date_dim [d_month_seq,d_year,d_moy]
+                                                              Subquery #4
+                                                                
WholeStageCodegen (2)
+                                                                  
HashAggregate [(d_month_seq + 3)]
+                                                                    
InputAdapter
+                                                                      Exchange 
[(d_month_seq + 3)] #11
+                                                                        
WholeStageCodegen (1)
+                                                                          
HashAggregate [(d_month_seq + 3)]
+                                                                            
Project [d_month_seq]
+                                                                              
Filter [d_year,d_moy]
+                                                                               
 ColumnarToRow
+                                                                               
   InputAdapter
+                                                                               
     Scan parquet spark_catalog.default.date_dim [d_month_seq,d_year,d_moy]
                                       InputAdapter
                                         ReusedExchange [d_date_sk] #9
diff --git 
a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q54/explain.txt
 
b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q54/explain.txt
index e5bf4132c34..cefaff0c09d 100644
--- 
a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q54/explain.txt
+++ 
b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q54/explain.txt
@@ -380,7 +380,7 @@ Input [2]: [d_date_sk#29, d_month_seq#41]
 
 (64) Filter [codegen id : 1]
 Input [2]: [d_date_sk#29, d_month_seq#41]
-Condition : (((isnotnull(d_month_seq#41) AND (d_month_seq#41 >= Subquery 
scalar-subquery#42, [id=#43])) AND (d_month_seq#41 <= Subquery 
scalar-subquery#44, [id=#45])) AND isnotnull(d_date_sk#29))
+Condition : (((isnotnull(d_month_seq#41) AND (d_month_seq#41 >= ReusedSubquery 
Subquery scalar-subquery#42, [id=#43])) AND (d_month_seq#41 <= ReusedSubquery 
Subquery scalar-subquery#44, [id=#45])) AND isnotnull(d_date_sk#29))
 
 (65) Project [codegen id : 1]
 Output [1]: [d_date_sk#29]
@@ -390,7 +390,11 @@ Input [2]: [d_date_sk#29, d_month_seq#41]
 Input [1]: [d_date_sk#29]
 Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as 
bigint)),false), [plan_id=10]
 
-Subquery:4 Hosting operator id = 64 Hosting Expression = Subquery 
scalar-subquery#42, [id=#43]
+Subquery:4 Hosting operator id = 64 Hosting Expression = ReusedSubquery 
Subquery scalar-subquery#42, [id=#43]
+
+Subquery:5 Hosting operator id = 64 Hosting Expression = ReusedSubquery 
Subquery scalar-subquery#44, [id=#45]
+
+Subquery:6 Hosting operator id = 62 Hosting Expression = Subquery 
scalar-subquery#42, [id=#43]
 * HashAggregate (73)
 +- Exchange (72)
    +- * HashAggregate (71)
@@ -436,7 +440,7 @@ Functions: []
 Aggregate Attributes: []
 Results [1]: [(d_month_seq + 1)#49]
 
-Subquery:5 Hosting operator id = 64 Hosting Expression = Subquery 
scalar-subquery#44, [id=#45]
+Subquery:7 Hosting operator id = 62 Hosting Expression = Subquery 
scalar-subquery#44, [id=#45]
 * HashAggregate (80)
 +- Exchange (79)
    +- * HashAggregate (78)
diff --git 
a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q54/simplified.txt
 
b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q54/simplified.txt
index 4c7809facbd..9547f9ba404 100644
--- 
a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q54/simplified.txt
+++ 
b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q54/simplified.txt
@@ -81,33 +81,35 @@ TakeOrderedAndProject [segment,num_customers,segment_base]
                                                           WholeStageCodegen (1)
                                                             Project [d_date_sk]
                                                               Filter 
[d_month_seq,d_date_sk]
-                                                                Subquery #3
-                                                                  
WholeStageCodegen (2)
-                                                                    
HashAggregate [(d_month_seq + 1)]
-                                                                      
InputAdapter
-                                                                        
Exchange [(d_month_seq + 1)] #9
-                                                                          
WholeStageCodegen (1)
-                                                                            
HashAggregate [(d_month_seq + 1)]
-                                                                              
Project [d_month_seq]
-                                                                               
 Filter [d_year,d_moy]
-                                                                               
   ColumnarToRow
-                                                                               
     InputAdapter
-                                                                               
       Scan parquet spark_catalog.default.date_dim [d_month_seq,d_year,d_moy]
-                                                                Subquery #4
-                                                                  
WholeStageCodegen (2)
-                                                                    
HashAggregate [(d_month_seq + 3)]
-                                                                      
InputAdapter
-                                                                        
Exchange [(d_month_seq + 3)] #10
-                                                                          
WholeStageCodegen (1)
-                                                                            
HashAggregate [(d_month_seq + 3)]
-                                                                              
Project [d_month_seq]
-                                                                               
 Filter [d_year,d_moy]
-                                                                               
   ColumnarToRow
-                                                                               
     InputAdapter
-                                                                               
       Scan parquet spark_catalog.default.date_dim [d_month_seq,d_year,d_moy]
+                                                                ReusedSubquery 
[(d_month_seq + 1)] #3
+                                                                ReusedSubquery 
[(d_month_seq + 3)] #4
                                                                 ColumnarToRow
                                                                   InputAdapter
                                                                     Scan 
parquet spark_catalog.default.date_dim [d_date_sk,d_month_seq]
+                                                                      Subquery 
#3
+                                                                        
WholeStageCodegen (2)
+                                                                          
HashAggregate [(d_month_seq + 1)]
+                                                                            
InputAdapter
+                                                                              
Exchange [(d_month_seq + 1)] #9
+                                                                               
 WholeStageCodegen (1)
+                                                                               
   HashAggregate [(d_month_seq + 1)]
+                                                                               
     Project [d_month_seq]
+                                                                               
       Filter [d_year,d_moy]
+                                                                               
         ColumnarToRow
+                                                                               
           InputAdapter
+                                                                               
             Scan parquet spark_catalog.default.date_dim 
[d_month_seq,d_year,d_moy]
+                                                                      Subquery 
#4
+                                                                        
WholeStageCodegen (2)
+                                                                          
HashAggregate [(d_month_seq + 3)]
+                                                                            
InputAdapter
+                                                                              
Exchange [(d_month_seq + 3)] #10
+                                                                               
 WholeStageCodegen (1)
+                                                                               
   HashAggregate [(d_month_seq + 3)]
+                                                                               
     Project [d_month_seq]
+                                                                               
       Filter [d_year,d_moy]
+                                                                               
         ColumnarToRow
+                                                                               
           InputAdapter
+                                                                               
             Scan parquet spark_catalog.default.date_dim 
[d_month_seq,d_year,d_moy]
                                     InputAdapter
                                       BroadcastExchange #11
                                         WholeStageCodegen (8)
diff --git 
a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q58.sf100/explain.txt
 
b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q58.sf100/explain.txt
index 7c81baa9931..26ffe2e0b32 100644
--- 
a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q58.sf100/explain.txt
+++ 
b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q58.sf100/explain.txt
@@ -328,7 +328,7 @@ Input [2]: [d_date#40, d_week_seq#41]
 
 (55) Filter [codegen id : 1]
 Input [2]: [d_date#40, d_week_seq#41]
-Condition : (isnotnull(d_week_seq#41) AND (d_week_seq#41 = Subquery 
scalar-subquery#42, [id=#43]))
+Condition : (isnotnull(d_week_seq#41) AND (d_week_seq#41 = ReusedSubquery 
Subquery scalar-subquery#42, [id=#43]))
 
 (56) Project [codegen id : 1]
 Output [1]: [d_date#40]
@@ -352,7 +352,9 @@ Input [2]: [d_date_sk#5, d_date#39]
 Input [1]: [d_date_sk#5]
 Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as 
bigint)),false), [plan_id=8]
 
-Subquery:2 Hosting operator id = 55 Hosting Expression = Subquery 
scalar-subquery#42, [id=#43]
+Subquery:2 Hosting operator id = 55 Hosting Expression = ReusedSubquery 
Subquery scalar-subquery#42, [id=#43]
+
+Subquery:3 Hosting operator id = 53 Hosting Expression = Subquery 
scalar-subquery#42, [id=#43]
 * Project (64)
 +- * Filter (63)
    +- * ColumnarToRow (62)
@@ -377,8 +379,8 @@ Condition : (isnotnull(d_date#44) AND (d_date#44 = 
2000-01-03))
 Output [1]: [d_week_seq#45]
 Input [2]: [d_date#44, d_week_seq#45]
 
-Subquery:3 Hosting operator id = 17 Hosting Expression = cs_sold_date_sk#15 IN 
dynamicpruning#4
+Subquery:4 Hosting operator id = 17 Hosting Expression = cs_sold_date_sk#15 IN 
dynamicpruning#4
 
-Subquery:4 Hosting operator id = 33 Hosting Expression = ws_sold_date_sk#26 IN 
dynamicpruning#4
+Subquery:5 Hosting operator id = 33 Hosting Expression = ws_sold_date_sk#26 IN 
dynamicpruning#4
 
 
diff --git 
a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q58.sf100/simplified.txt
 
b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q58.sf100/simplified.txt
index 42403c1c39a..7615324ade0 100644
--- 
a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q58.sf100/simplified.txt
+++ 
b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q58.sf100/simplified.txt
@@ -32,16 +32,17 @@ TakeOrderedAndProject 
[item_id,ss_item_rev,ss_dev,cs_item_rev,cs_dev,ws_item_rev
                                                       WholeStageCodegen (1)
                                                         Project [d_date]
                                                           Filter [d_week_seq]
-                                                            Subquery #2
-                                                              
WholeStageCodegen (1)
-                                                                Project 
[d_week_seq]
-                                                                  Filter 
[d_date]
-                                                                    
ColumnarToRow
-                                                                      
InputAdapter
-                                                                        Scan 
parquet spark_catalog.default.date_dim [d_date,d_week_seq]
+                                                            ReusedSubquery 
[d_week_seq] #2
                                                             ColumnarToRow
                                                               InputAdapter
                                                                 Scan parquet 
spark_catalog.default.date_dim [d_date,d_week_seq]
+                                                                  Subquery #2
+                                                                    
WholeStageCodegen (1)
+                                                                      Project 
[d_week_seq]
+                                                                        Filter 
[d_date]
+                                                                          
ColumnarToRow
+                                                                            
InputAdapter
+                                                                              
Scan parquet spark_catalog.default.date_dim [d_date,d_week_seq]
                                 InputAdapter
                                   ReusedExchange [d_date_sk] #2
                             InputAdapter
diff --git 
a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q58/explain.txt
 
b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q58/explain.txt
index ff8c4392d26..cdb5e45f668 100644
--- 
a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q58/explain.txt
+++ 
b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q58/explain.txt
@@ -328,7 +328,7 @@ Input [2]: [d_date#40, d_week_seq#41]
 
 (55) Filter [codegen id : 1]
 Input [2]: [d_date#40, d_week_seq#41]
-Condition : (isnotnull(d_week_seq#41) AND (d_week_seq#41 = Subquery 
scalar-subquery#42, [id=#43]))
+Condition : (isnotnull(d_week_seq#41) AND (d_week_seq#41 = ReusedSubquery 
Subquery scalar-subquery#42, [id=#43]))
 
 (56) Project [codegen id : 1]
 Output [1]: [d_date#40]
@@ -352,7 +352,9 @@ Input [2]: [d_date_sk#7, d_date#39]
 Input [1]: [d_date_sk#7]
 Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as 
bigint)),false), [plan_id=8]
 
-Subquery:2 Hosting operator id = 55 Hosting Expression = Subquery 
scalar-subquery#42, [id=#43]
+Subquery:2 Hosting operator id = 55 Hosting Expression = ReusedSubquery 
Subquery scalar-subquery#42, [id=#43]
+
+Subquery:3 Hosting operator id = 53 Hosting Expression = Subquery 
scalar-subquery#42, [id=#43]
 * Project (64)
 +- * Filter (63)
    +- * ColumnarToRow (62)
@@ -377,8 +379,8 @@ Condition : (isnotnull(d_date#44) AND (d_date#44 = 
2000-01-03))
 Output [1]: [d_week_seq#45]
 Input [2]: [d_date#44, d_week_seq#45]
 
-Subquery:3 Hosting operator id = 17 Hosting Expression = cs_sold_date_sk#15 IN 
dynamicpruning#4
+Subquery:4 Hosting operator id = 17 Hosting Expression = cs_sold_date_sk#15 IN 
dynamicpruning#4
 
-Subquery:4 Hosting operator id = 33 Hosting Expression = ws_sold_date_sk#26 IN 
dynamicpruning#4
+Subquery:5 Hosting operator id = 33 Hosting Expression = ws_sold_date_sk#26 IN 
dynamicpruning#4
 
 
diff --git 
a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q58/simplified.txt
 
b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q58/simplified.txt
index 4b99a85d0b7..348ffcd857b 100644
--- 
a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q58/simplified.txt
+++ 
b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q58/simplified.txt
@@ -32,16 +32,17 @@ TakeOrderedAndProject 
[item_id,ss_item_rev,ss_dev,cs_item_rev,cs_dev,ws_item_rev
                                                       WholeStageCodegen (1)
                                                         Project [d_date]
                                                           Filter [d_week_seq]
-                                                            Subquery #2
-                                                              
WholeStageCodegen (1)
-                                                                Project 
[d_week_seq]
-                                                                  Filter 
[d_date]
-                                                                    
ColumnarToRow
-                                                                      
InputAdapter
-                                                                        Scan 
parquet spark_catalog.default.date_dim [d_date,d_week_seq]
+                                                            ReusedSubquery 
[d_week_seq] #2
                                                             ColumnarToRow
                                                               InputAdapter
                                                                 Scan parquet 
spark_catalog.default.date_dim [d_date,d_week_seq]
+                                                                  Subquery #2
+                                                                    
WholeStageCodegen (1)
+                                                                      Project 
[d_week_seq]
+                                                                        Filter 
[d_date]
+                                                                          
ColumnarToRow
+                                                                            
InputAdapter
+                                                                              
Scan parquet spark_catalog.default.date_dim [d_date,d_week_seq]
                                 InputAdapter
                                   BroadcastExchange #4
                                     WholeStageCodegen (1)
diff --git 
a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q6.sf100/explain.txt
 
b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q6.sf100/explain.txt
index 1261aab95e0..93db1e57839 100644
--- 
a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q6.sf100/explain.txt
+++ 
b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q6.sf100/explain.txt
@@ -280,7 +280,7 @@ Input [2]: [d_date_sk#16, d_month_seq#26]
 
 (48) Filter [codegen id : 1]
 Input [2]: [d_date_sk#16, d_month_seq#26]
-Condition : ((isnotnull(d_month_seq#26) AND (d_month_seq#26 = Subquery 
scalar-subquery#27, [id=#28])) AND isnotnull(d_date_sk#16))
+Condition : ((isnotnull(d_month_seq#26) AND (d_month_seq#26 = ReusedSubquery 
Subquery scalar-subquery#27, [id=#28])) AND isnotnull(d_date_sk#16))
 
 (49) Project [codegen id : 1]
 Output [1]: [d_date_sk#16]
@@ -290,7 +290,9 @@ Input [2]: [d_date_sk#16, d_month_seq#26]
 Input [1]: [d_date_sk#16]
 Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as 
bigint)),false), [plan_id=9]
 
-Subquery:2 Hosting operator id = 48 Hosting Expression = Subquery 
scalar-subquery#27, [id=#28]
+Subquery:2 Hosting operator id = 48 Hosting Expression = ReusedSubquery 
Subquery scalar-subquery#27, [id=#28]
+
+Subquery:3 Hosting operator id = 46 Hosting Expression = Subquery 
scalar-subquery#27, [id=#28]
 * HashAggregate (57)
 +- Exchange (56)
    +- * HashAggregate (55)
diff --git 
a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q6.sf100/simplified.txt
 
b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q6.sf100/simplified.txt
index 6931b58e7a8..d69eb47d92c 100644
--- 
a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q6.sf100/simplified.txt
+++ 
b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q6.sf100/simplified.txt
@@ -49,21 +49,22 @@ TakeOrderedAndProject [cnt,state]
                                                     WholeStageCodegen (1)
                                                       Project [d_date_sk]
                                                         Filter 
[d_month_seq,d_date_sk]
-                                                          Subquery #2
-                                                            WholeStageCodegen 
(2)
-                                                              HashAggregate 
[d_month_seq]
-                                                                InputAdapter
-                                                                  Exchange 
[d_month_seq] #7
-                                                                    
WholeStageCodegen (1)
-                                                                      
HashAggregate [d_month_seq]
-                                                                        
Project [d_month_seq]
-                                                                          
Filter [d_year,d_moy]
-                                                                            
ColumnarToRow
-                                                                              
InputAdapter
-                                                                               
 Scan parquet spark_catalog.default.date_dim [d_month_seq,d_year,d_moy]
+                                                          ReusedSubquery 
[d_month_seq] #2
                                                           ColumnarToRow
                                                             InputAdapter
                                                               Scan parquet 
spark_catalog.default.date_dim [d_date_sk,d_month_seq]
+                                                                Subquery #2
+                                                                  
WholeStageCodegen (2)
+                                                                    
HashAggregate [d_month_seq]
+                                                                      
InputAdapter
+                                                                        
Exchange [d_month_seq] #7
+                                                                          
WholeStageCodegen (1)
+                                                                            
HashAggregate [d_month_seq]
+                                                                              
Project [d_month_seq]
+                                                                               
 Filter [d_year,d_moy]
+                                                                               
   ColumnarToRow
+                                                                               
     InputAdapter
+                                                                               
       Scan parquet spark_catalog.default.date_dim [d_month_seq,d_year,d_moy]
                                     InputAdapter
                                       ReusedExchange [d_date_sk] #6
                     InputAdapter
diff --git 
a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q6/explain.txt
 
b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q6/explain.txt
index 7faf0cd7f16..bd5bdfb6661 100644
--- 
a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q6/explain.txt
+++ 
b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q6/explain.txt
@@ -250,7 +250,7 @@ Input [2]: [d_date_sk#9, d_month_seq#26]
 
 (42) Filter [codegen id : 1]
 Input [2]: [d_date_sk#9, d_month_seq#26]
-Condition : ((isnotnull(d_month_seq#26) AND (d_month_seq#26 = Subquery 
scalar-subquery#27, [id=#28])) AND isnotnull(d_date_sk#9))
+Condition : ((isnotnull(d_month_seq#26) AND (d_month_seq#26 = ReusedSubquery 
Subquery scalar-subquery#27, [id=#28])) AND isnotnull(d_date_sk#9))
 
 (43) Project [codegen id : 1]
 Output [1]: [d_date_sk#9]
@@ -260,7 +260,9 @@ Input [2]: [d_date_sk#9, d_month_seq#26]
 Input [1]: [d_date_sk#9]
 Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as 
bigint)),false), [plan_id=7]
 
-Subquery:2 Hosting operator id = 42 Hosting Expression = Subquery 
scalar-subquery#27, [id=#28]
+Subquery:2 Hosting operator id = 42 Hosting Expression = ReusedSubquery 
Subquery scalar-subquery#27, [id=#28]
+
+Subquery:3 Hosting operator id = 40 Hosting Expression = Subquery 
scalar-subquery#27, [id=#28]
 * HashAggregate (51)
 +- Exchange (50)
    +- * HashAggregate (49)
diff --git 
a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q6/simplified.txt
 
b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q6/simplified.txt
index 4a4b404758e..a15b638fbb2 100644
--- 
a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q6/simplified.txt
+++ 
b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q6/simplified.txt
@@ -37,21 +37,22 @@ TakeOrderedAndProject [cnt,state]
                                               WholeStageCodegen (1)
                                                 Project [d_date_sk]
                                                   Filter 
[d_month_seq,d_date_sk]
-                                                    Subquery #2
-                                                      WholeStageCodegen (2)
-                                                        HashAggregate 
[d_month_seq]
-                                                          InputAdapter
-                                                            Exchange 
[d_month_seq] #5
-                                                              
WholeStageCodegen (1)
-                                                                HashAggregate 
[d_month_seq]
-                                                                  Project 
[d_month_seq]
-                                                                    Filter 
[d_year,d_moy]
-                                                                      
ColumnarToRow
-                                                                        
InputAdapter
-                                                                          Scan 
parquet spark_catalog.default.date_dim [d_month_seq,d_year,d_moy]
+                                                    ReusedSubquery 
[d_month_seq] #2
                                                     ColumnarToRow
                                                       InputAdapter
                                                         Scan parquet 
spark_catalog.default.date_dim [d_date_sk,d_month_seq]
+                                                          Subquery #2
+                                                            WholeStageCodegen 
(2)
+                                                              HashAggregate 
[d_month_seq]
+                                                                InputAdapter
+                                                                  Exchange 
[d_month_seq] #5
+                                                                    
WholeStageCodegen (1)
+                                                                      
HashAggregate [d_month_seq]
+                                                                        
Project [d_month_seq]
+                                                                          
Filter [d_year,d_moy]
+                                                                            
ColumnarToRow
+                                                                              
InputAdapter
+                                                                               
 Scan parquet spark_catalog.default.date_dim [d_month_seq,d_year,d_moy]
                         InputAdapter
                           ReusedExchange [d_date_sk] #4
                     InputAdapter
diff --git 
a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q14.sf100/explain.txt
 
b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q14.sf100/explain.txt
index e8f37353b25..1440326b862 100644
--- 
a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q14.sf100/explain.txt
+++ 
b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q14.sf100/explain.txt
@@ -656,7 +656,7 @@ Input [2]: [d_date_sk#36, d_week_seq#100]
 
 (112) Filter [codegen id : 1]
 Input [2]: [d_date_sk#36, d_week_seq#100]
-Condition : ((isnotnull(d_week_seq#100) AND (d_week_seq#100 = Subquery 
scalar-subquery#101, [id=#102])) AND isnotnull(d_date_sk#36))
+Condition : ((isnotnull(d_week_seq#100) AND (d_week_seq#100 = ReusedSubquery 
Subquery scalar-subquery#101, [id=#102])) AND isnotnull(d_date_sk#36))
 
 (113) Project [codegen id : 1]
 Output [1]: [d_date_sk#36]
@@ -666,7 +666,9 @@ Input [2]: [d_date_sk#36, d_week_seq#100]
 Input [1]: [d_date_sk#36]
 Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as 
bigint)),false), [plan_id=15]
 
-Subquery:6 Hosting operator id = 112 Hosting Expression = Subquery 
scalar-subquery#101, [id=#102]
+Subquery:6 Hosting operator id = 112 Hosting Expression = ReusedSubquery 
Subquery scalar-subquery#101, [id=#102]
+
+Subquery:7 Hosting operator id = 110 Hosting Expression = Subquery 
scalar-subquery#101, [id=#102]
 * Project (118)
 +- * Filter (117)
    +- * ColumnarToRow (116)
@@ -691,7 +693,7 @@ Condition : (((((isnotnull(d_year#104) AND 
isnotnull(d_moy#105)) AND isnotnull(d
 Output [1]: [d_week_seq#103]
 Input [4]: [d_week_seq#103, d_year#104, d_moy#105, d_dom#106]
 
-Subquery:7 Hosting operator id = 7 Hosting Expression = ss_sold_date_sk#11 IN 
dynamicpruning#12
+Subquery:8 Hosting operator id = 7 Hosting Expression = ss_sold_date_sk#11 IN 
dynamicpruning#12
 BroadcastExchange (123)
 +- * Project (122)
    +- * Filter (121)
@@ -721,13 +723,13 @@ Input [2]: [d_date_sk#13, d_year#107]
 Input [1]: [d_date_sk#13]
 Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as 
bigint)),false), [plan_id=16]
 
-Subquery:8 Hosting operator id = 18 Hosting Expression = cs_sold_date_sk#19 IN 
dynamicpruning#12
+Subquery:9 Hosting operator id = 18 Hosting Expression = cs_sold_date_sk#19 IN 
dynamicpruning#12
 
-Subquery:9 Hosting operator id = 41 Hosting Expression = ws_sold_date_sk#29 IN 
dynamicpruning#12
+Subquery:10 Hosting operator id = 41 Hosting Expression = ws_sold_date_sk#29 
IN dynamicpruning#12
 
-Subquery:10 Hosting operator id = 87 Hosting Expression = ReusedSubquery 
Subquery scalar-subquery#52, [id=#53]
+Subquery:11 Hosting operator id = 87 Hosting Expression = ReusedSubquery 
Subquery scalar-subquery#52, [id=#53]
 
-Subquery:11 Hosting operator id = 73 Hosting Expression = ss_sold_date_sk#57 
IN dynamicpruning#58
+Subquery:12 Hosting operator id = 73 Hosting Expression = ss_sold_date_sk#57 
IN dynamicpruning#58
 BroadcastExchange (128)
 +- * Project (127)
    +- * Filter (126)
@@ -747,7 +749,7 @@ Input [2]: [d_date_sk#60, d_week_seq#108]
 
 (126) Filter [codegen id : 1]
 Input [2]: [d_date_sk#60, d_week_seq#108]
-Condition : ((isnotnull(d_week_seq#108) AND (d_week_seq#108 = Subquery 
scalar-subquery#109, [id=#110])) AND isnotnull(d_date_sk#60))
+Condition : ((isnotnull(d_week_seq#108) AND (d_week_seq#108 = ReusedSubquery 
Subquery scalar-subquery#109, [id=#110])) AND isnotnull(d_date_sk#60))
 
 (127) Project [codegen id : 1]
 Output [1]: [d_date_sk#60]
@@ -757,7 +759,9 @@ Input [2]: [d_date_sk#60, d_week_seq#108]
 Input [1]: [d_date_sk#60]
 Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as 
bigint)),false), [plan_id=17]
 
-Subquery:12 Hosting operator id = 126 Hosting Expression = Subquery 
scalar-subquery#109, [id=#110]
+Subquery:13 Hosting operator id = 126 Hosting Expression = ReusedSubquery 
Subquery scalar-subquery#109, [id=#110]
+
+Subquery:14 Hosting operator id = 124 Hosting Expression = Subquery 
scalar-subquery#109, [id=#110]
 * Project (132)
 +- * Filter (131)
    +- * ColumnarToRow (130)
diff --git 
a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q14.sf100/simplified.txt
 
b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q14.sf100/simplified.txt
index edd34864986..4db035858b2 100644
--- 
a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q14.sf100/simplified.txt
+++ 
b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q14.sf100/simplified.txt
@@ -57,16 +57,17 @@ TakeOrderedAndProject 
[i_brand_id,i_class_id,i_category_id,channel,sales,number_
                                         WholeStageCodegen (1)
                                           Project [d_date_sk]
                                             Filter [d_week_seq,d_date_sk]
-                                              Subquery #2
-                                                WholeStageCodegen (1)
-                                                  Project [d_week_seq]
-                                                    Filter [d_year,d_moy,d_dom]
-                                                      ColumnarToRow
-                                                        InputAdapter
-                                                          Scan parquet 
spark_catalog.default.date_dim [d_week_seq,d_year,d_moy,d_dom]
+                                              ReusedSubquery [d_week_seq] #2
                                               ColumnarToRow
                                                 InputAdapter
                                                   Scan parquet 
spark_catalog.default.date_dim [d_date_sk,d_week_seq]
+                                                    Subquery #2
+                                                      WholeStageCodegen (1)
+                                                        Project [d_week_seq]
+                                                          Filter 
[d_year,d_moy,d_dom]
+                                                            ColumnarToRow
+                                                              InputAdapter
+                                                                Scan parquet 
spark_catalog.default.date_dim [d_week_seq,d_year,d_moy,d_dom]
                             InputAdapter
                               BroadcastExchange #3
                                 WholeStageCodegen (17)
@@ -202,16 +203,17 @@ TakeOrderedAndProject 
[i_brand_id,i_class_id,i_category_id,channel,sales,number_
                                               WholeStageCodegen (1)
                                                 Project [d_date_sk]
                                                   Filter [d_week_seq,d_date_sk]
-                                                    Subquery #6
-                                                      WholeStageCodegen (1)
-                                                        Project [d_week_seq]
-                                                          Filter 
[d_year,d_moy,d_dom]
-                                                            ColumnarToRow
-                                                              InputAdapter
-                                                                Scan parquet 
spark_catalog.default.date_dim [d_week_seq,d_year,d_moy,d_dom]
+                                                    ReusedSubquery 
[d_week_seq] #6
                                                     ColumnarToRow
                                                       InputAdapter
                                                         Scan parquet 
spark_catalog.default.date_dim [d_date_sk,d_week_seq]
+                                                          Subquery #6
+                                                            WholeStageCodegen 
(1)
+                                                              Project 
[d_week_seq]
+                                                                Filter 
[d_year,d_moy,d_dom]
+                                                                  ColumnarToRow
+                                                                    
InputAdapter
+                                                                      Scan 
parquet spark_catalog.default.date_dim [d_week_seq,d_year,d_moy,d_dom]
                                   InputAdapter
                                     ReusedExchange [ss_item_sk] #3
                                 InputAdapter
diff --git 
a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q14/explain.txt
 
b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q14/explain.txt
index b33fb2ab421..1e4ca929b96 100644
--- 
a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q14/explain.txt
+++ 
b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q14/explain.txt
@@ -626,7 +626,7 @@ Input [2]: [d_date_sk#40, d_week_seq#100]
 
 (106) Filter [codegen id : 1]
 Input [2]: [d_date_sk#40, d_week_seq#100]
-Condition : ((isnotnull(d_week_seq#100) AND (d_week_seq#100 = Subquery 
scalar-subquery#101, [id=#102])) AND isnotnull(d_date_sk#40))
+Condition : ((isnotnull(d_week_seq#100) AND (d_week_seq#100 = ReusedSubquery 
Subquery scalar-subquery#101, [id=#102])) AND isnotnull(d_date_sk#40))
 
 (107) Project [codegen id : 1]
 Output [1]: [d_date_sk#40]
@@ -636,7 +636,9 @@ Input [2]: [d_date_sk#40, d_week_seq#100]
 Input [1]: [d_date_sk#40]
 Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as 
bigint)),false), [plan_id=13]
 
-Subquery:6 Hosting operator id = 106 Hosting Expression = Subquery 
scalar-subquery#101, [id=#102]
+Subquery:6 Hosting operator id = 106 Hosting Expression = ReusedSubquery 
Subquery scalar-subquery#101, [id=#102]
+
+Subquery:7 Hosting operator id = 104 Hosting Expression = Subquery 
scalar-subquery#101, [id=#102]
 * Project (112)
 +- * Filter (111)
    +- * ColumnarToRow (110)
@@ -661,7 +663,7 @@ Condition : (((((isnotnull(d_year#104) AND 
isnotnull(d_moy#105)) AND isnotnull(d
 Output [1]: [d_week_seq#103]
 Input [4]: [d_week_seq#103, d_year#104, d_moy#105, d_dom#106]
 
-Subquery:7 Hosting operator id = 7 Hosting Expression = ss_sold_date_sk#11 IN 
dynamicpruning#12
+Subquery:8 Hosting operator id = 7 Hosting Expression = ss_sold_date_sk#11 IN 
dynamicpruning#12
 BroadcastExchange (117)
 +- * Project (116)
    +- * Filter (115)
@@ -691,13 +693,13 @@ Input [2]: [d_date_sk#24, d_year#107]
 Input [1]: [d_date_sk#24]
 Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as 
bigint)),false), [plan_id=14]
 
-Subquery:8 Hosting operator id = 13 Hosting Expression = cs_sold_date_sk#18 IN 
dynamicpruning#12
+Subquery:9 Hosting operator id = 13 Hosting Expression = cs_sold_date_sk#18 IN 
dynamicpruning#12
 
-Subquery:9 Hosting operator id = 36 Hosting Expression = ws_sold_date_sk#29 IN 
dynamicpruning#12
+Subquery:10 Hosting operator id = 36 Hosting Expression = ws_sold_date_sk#29 
IN dynamicpruning#12
 
-Subquery:10 Hosting operator id = 81 Hosting Expression = ReusedSubquery 
Subquery scalar-subquery#52, [id=#53]
+Subquery:11 Hosting operator id = 81 Hosting Expression = ReusedSubquery 
Subquery scalar-subquery#52, [id=#53]
 
-Subquery:11 Hosting operator id = 67 Hosting Expression = ss_sold_date_sk#57 
IN dynamicpruning#58
+Subquery:12 Hosting operator id = 67 Hosting Expression = ss_sold_date_sk#57 
IN dynamicpruning#58
 BroadcastExchange (122)
 +- * Project (121)
    +- * Filter (120)
@@ -717,7 +719,7 @@ Input [2]: [d_date_sk#64, d_week_seq#108]
 
 (120) Filter [codegen id : 1]
 Input [2]: [d_date_sk#64, d_week_seq#108]
-Condition : ((isnotnull(d_week_seq#108) AND (d_week_seq#108 = Subquery 
scalar-subquery#109, [id=#110])) AND isnotnull(d_date_sk#64))
+Condition : ((isnotnull(d_week_seq#108) AND (d_week_seq#108 = ReusedSubquery 
Subquery scalar-subquery#109, [id=#110])) AND isnotnull(d_date_sk#64))
 
 (121) Project [codegen id : 1]
 Output [1]: [d_date_sk#64]
@@ -727,7 +729,9 @@ Input [2]: [d_date_sk#64, d_week_seq#108]
 Input [1]: [d_date_sk#64]
 Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as 
bigint)),false), [plan_id=15]
 
-Subquery:12 Hosting operator id = 120 Hosting Expression = Subquery 
scalar-subquery#109, [id=#110]
+Subquery:13 Hosting operator id = 120 Hosting Expression = ReusedSubquery 
Subquery scalar-subquery#109, [id=#110]
+
+Subquery:14 Hosting operator id = 118 Hosting Expression = Subquery 
scalar-subquery#109, [id=#110]
 * Project (126)
 +- * Filter (125)
    +- * ColumnarToRow (124)
diff --git 
a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q14/simplified.txt
 
b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q14/simplified.txt
index 8d8dcccd5d7..b103e79d118 100644
--- 
a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q14/simplified.txt
+++ 
b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q14/simplified.txt
@@ -57,16 +57,17 @@ TakeOrderedAndProject 
[i_brand_id,i_class_id,i_category_id,channel,sales,number_
                                         WholeStageCodegen (1)
                                           Project [d_date_sk]
                                             Filter [d_week_seq,d_date_sk]
-                                              Subquery #2
-                                                WholeStageCodegen (1)
-                                                  Project [d_week_seq]
-                                                    Filter [d_year,d_moy,d_dom]
-                                                      ColumnarToRow
-                                                        InputAdapter
-                                                          Scan parquet 
spark_catalog.default.date_dim [d_week_seq,d_year,d_moy,d_dom]
+                                              ReusedSubquery [d_week_seq] #2
                                               ColumnarToRow
                                                 InputAdapter
                                                   Scan parquet 
spark_catalog.default.date_dim [d_date_sk,d_week_seq]
+                                                    Subquery #2
+                                                      WholeStageCodegen (1)
+                                                        Project [d_week_seq]
+                                                          Filter 
[d_year,d_moy,d_dom]
+                                                            ColumnarToRow
+                                                              InputAdapter
+                                                                Scan parquet 
spark_catalog.default.date_dim [d_week_seq,d_year,d_moy,d_dom]
                             InputAdapter
                               BroadcastExchange #3
                                 WholeStageCodegen (11)
@@ -184,16 +185,17 @@ TakeOrderedAndProject 
[i_brand_id,i_class_id,i_category_id,channel,sales,number_
                                               WholeStageCodegen (1)
                                                 Project [d_date_sk]
                                                   Filter [d_week_seq,d_date_sk]
-                                                    Subquery #6
-                                                      WholeStageCodegen (1)
-                                                        Project [d_week_seq]
-                                                          Filter 
[d_year,d_moy,d_dom]
-                                                            ColumnarToRow
-                                                              InputAdapter
-                                                                Scan parquet 
spark_catalog.default.date_dim [d_week_seq,d_year,d_moy,d_dom]
+                                                    ReusedSubquery 
[d_week_seq] #6
                                                     ColumnarToRow
                                                       InputAdapter
                                                         Scan parquet 
spark_catalog.default.date_dim [d_date_sk,d_week_seq]
+                                                          Subquery #6
+                                                            WholeStageCodegen 
(1)
+                                                              Project 
[d_week_seq]
+                                                                Filter 
[d_year,d_moy,d_dom]
+                                                                  ColumnarToRow
+                                                                    
InputAdapter
+                                                                      Scan 
parquet spark_catalog.default.date_dim [d_week_seq,d_year,d_moy,d_dom]
                                   InputAdapter
                                     ReusedExchange [ss_item_sk] #3
                                 InputAdapter
diff --git 
a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6.sf100/explain.txt
 
b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6.sf100/explain.txt
index a776a5c8d8f..55bed0dade7 100644
--- 
a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6.sf100/explain.txt
+++ 
b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6.sf100/explain.txt
@@ -280,7 +280,7 @@ Input [2]: [d_date_sk#16, d_month_seq#26]
 
 (48) Filter [codegen id : 1]
 Input [2]: [d_date_sk#16, d_month_seq#26]
-Condition : ((isnotnull(d_month_seq#26) AND (d_month_seq#26 = Subquery 
scalar-subquery#27, [id=#28])) AND isnotnull(d_date_sk#16))
+Condition : ((isnotnull(d_month_seq#26) AND (d_month_seq#26 = ReusedSubquery 
Subquery scalar-subquery#27, [id=#28])) AND isnotnull(d_date_sk#16))
 
 (49) Project [codegen id : 1]
 Output [1]: [d_date_sk#16]
@@ -290,7 +290,9 @@ Input [2]: [d_date_sk#16, d_month_seq#26]
 Input [1]: [d_date_sk#16]
 Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as 
bigint)),false), [plan_id=9]
 
-Subquery:2 Hosting operator id = 48 Hosting Expression = Subquery 
scalar-subquery#27, [id=#28]
+Subquery:2 Hosting operator id = 48 Hosting Expression = ReusedSubquery 
Subquery scalar-subquery#27, [id=#28]
+
+Subquery:3 Hosting operator id = 46 Hosting Expression = Subquery 
scalar-subquery#27, [id=#28]
 * HashAggregate (57)
 +- Exchange (56)
    +- * HashAggregate (55)
diff --git 
a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6.sf100/simplified.txt
 
b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6.sf100/simplified.txt
index 62f3073b0c7..7339df16a28 100644
--- 
a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6.sf100/simplified.txt
+++ 
b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6.sf100/simplified.txt
@@ -49,21 +49,22 @@ TakeOrderedAndProject [cnt,ca_state,state]
                                                     WholeStageCodegen (1)
                                                       Project [d_date_sk]
                                                         Filter 
[d_month_seq,d_date_sk]
-                                                          Subquery #2
-                                                            WholeStageCodegen 
(2)
-                                                              HashAggregate 
[d_month_seq]
-                                                                InputAdapter
-                                                                  Exchange 
[d_month_seq] #7
-                                                                    
WholeStageCodegen (1)
-                                                                      
HashAggregate [d_month_seq]
-                                                                        
Project [d_month_seq]
-                                                                          
Filter [d_year,d_moy]
-                                                                            
ColumnarToRow
-                                                                              
InputAdapter
-                                                                               
 Scan parquet spark_catalog.default.date_dim [d_month_seq,d_year,d_moy]
+                                                          ReusedSubquery 
[d_month_seq] #2
                                                           ColumnarToRow
                                                             InputAdapter
                                                               Scan parquet 
spark_catalog.default.date_dim [d_date_sk,d_month_seq]
+                                                                Subquery #2
+                                                                  
WholeStageCodegen (2)
+                                                                    
HashAggregate [d_month_seq]
+                                                                      
InputAdapter
+                                                                        
Exchange [d_month_seq] #7
+                                                                          
WholeStageCodegen (1)
+                                                                            
HashAggregate [d_month_seq]
+                                                                              
Project [d_month_seq]
+                                                                               
 Filter [d_year,d_moy]
+                                                                               
   ColumnarToRow
+                                                                               
     InputAdapter
+                                                                               
       Scan parquet spark_catalog.default.date_dim [d_month_seq,d_year,d_moy]
                                     InputAdapter
                                       ReusedExchange [d_date_sk] #6
                     InputAdapter
diff --git 
a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6/explain.txt
 
b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6/explain.txt
index 909869bcbc3..6713acc9754 100644
--- 
a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6/explain.txt
+++ 
b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6/explain.txt
@@ -250,7 +250,7 @@ Input [2]: [d_date_sk#9, d_month_seq#26]
 
 (42) Filter [codegen id : 1]
 Input [2]: [d_date_sk#9, d_month_seq#26]
-Condition : ((isnotnull(d_month_seq#26) AND (d_month_seq#26 = Subquery 
scalar-subquery#27, [id=#28])) AND isnotnull(d_date_sk#9))
+Condition : ((isnotnull(d_month_seq#26) AND (d_month_seq#26 = ReusedSubquery 
Subquery scalar-subquery#27, [id=#28])) AND isnotnull(d_date_sk#9))
 
 (43) Project [codegen id : 1]
 Output [1]: [d_date_sk#9]
@@ -260,7 +260,9 @@ Input [2]: [d_date_sk#9, d_month_seq#26]
 Input [1]: [d_date_sk#9]
 Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as 
bigint)),false), [plan_id=7]
 
-Subquery:2 Hosting operator id = 42 Hosting Expression = Subquery 
scalar-subquery#27, [id=#28]
+Subquery:2 Hosting operator id = 42 Hosting Expression = ReusedSubquery 
Subquery scalar-subquery#27, [id=#28]
+
+Subquery:3 Hosting operator id = 40 Hosting Expression = Subquery 
scalar-subquery#27, [id=#28]
 * HashAggregate (51)
 +- Exchange (50)
    +- * HashAggregate (49)
diff --git 
a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6/simplified.txt
 
b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6/simplified.txt
index 1b29f508a02..c9c8358ba0b 100644
--- 
a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6/simplified.txt
+++ 
b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6/simplified.txt
@@ -37,21 +37,22 @@ TakeOrderedAndProject [cnt,ca_state,state]
                                               WholeStageCodegen (1)
                                                 Project [d_date_sk]
                                                   Filter 
[d_month_seq,d_date_sk]
-                                                    Subquery #2
-                                                      WholeStageCodegen (2)
-                                                        HashAggregate 
[d_month_seq]
-                                                          InputAdapter
-                                                            Exchange 
[d_month_seq] #5
-                                                              
WholeStageCodegen (1)
-                                                                HashAggregate 
[d_month_seq]
-                                                                  Project 
[d_month_seq]
-                                                                    Filter 
[d_year,d_moy]
-                                                                      
ColumnarToRow
-                                                                        
InputAdapter
-                                                                          Scan 
parquet spark_catalog.default.date_dim [d_month_seq,d_year,d_moy]
+                                                    ReusedSubquery 
[d_month_seq] #2
                                                     ColumnarToRow
                                                       InputAdapter
                                                         Scan parquet 
spark_catalog.default.date_dim [d_date_sk,d_month_seq]
+                                                          Subquery #2
+                                                            WholeStageCodegen 
(2)
+                                                              HashAggregate 
[d_month_seq]
+                                                                InputAdapter
+                                                                  Exchange 
[d_month_seq] #5
+                                                                    
WholeStageCodegen (1)
+                                                                      
HashAggregate [d_month_seq]
+                                                                        
Project [d_month_seq]
+                                                                          
Filter [d_year,d_moy]
+                                                                            
ColumnarToRow
+                                                                              
InputAdapter
+                                                                               
 Scan parquet spark_catalog.default.date_dim [d_month_seq,d_year,d_moy]
                         InputAdapter
                           ReusedExchange [d_date_sk] #4
                     InputAdapter
diff --git a/sql/core/src/test/resources/tpch-plan-stability/q22/explain.txt 
b/sql/core/src/test/resources/tpch-plan-stability/q22/explain.txt
index 14405dab7bf..eafde15b7e1 100644
--- a/sql/core/src/test/resources/tpch-plan-stability/q22/explain.txt
+++ b/sql/core/src/test/resources/tpch-plan-stability/q22/explain.txt
@@ -26,7 +26,7 @@ Input [3]: [c_custkey#1, c_phone#2, c_acctbal#3]
 
 (3) Filter [codegen id : 2]
 Input [3]: [c_custkey#1, c_phone#2, c_acctbal#3]
-Condition : ((isnotnull(c_acctbal#3) AND substring(c_phone#2, 1, 2) IN 
(13,31,23,29,30,18,17)) AND (cast(c_acctbal#3 as decimal(14,4)) > Subquery 
scalar-subquery#4, [id=#5]))
+Condition : ((isnotnull(c_acctbal#3) AND substring(c_phone#2, 1, 2) IN 
(13,31,23,29,30,18,17)) AND (cast(c_acctbal#3 as decimal(14,4)) > 
ReusedSubquery Subquery scalar-subquery#4, [id=#5]))
 
 (4) Scan parquet spark_catalog.default.orders
 Output [1]: [o_custkey#6]
@@ -79,7 +79,9 @@ Arguments: [cntrycode#7 ASC NULLS FIRST], true, 0
 
 ===== Subqueries =====
 
-Subquery:1 Hosting operator id = 3 Hosting Expression = Subquery 
scalar-subquery#4, [id=#5]
+Subquery:1 Hosting operator id = 3 Hosting Expression = ReusedSubquery 
Subquery scalar-subquery#4, [id=#5]
+
+Subquery:2 Hosting operator id = 1 Hosting Expression = Subquery 
scalar-subquery#4, [id=#5]
 * HashAggregate (20)
 +- Exchange (19)
    +- * HashAggregate (18)
diff --git a/sql/core/src/test/resources/tpch-plan-stability/q22/simplified.txt 
b/sql/core/src/test/resources/tpch-plan-stability/q22/simplified.txt
index 51e531a1e8a..8ef5a20269b 100644
--- a/sql/core/src/test/resources/tpch-plan-stability/q22/simplified.txt
+++ b/sql/core/src/test/resources/tpch-plan-stability/q22/simplified.txt
@@ -11,21 +11,22 @@ WholeStageCodegen (4)
                     Project [c_phone,c_acctbal]
                       BroadcastHashJoin [c_custkey,o_custkey]
                         Filter [c_acctbal,c_phone]
-                          Subquery #1
-                            WholeStageCodegen (2)
-                              HashAggregate [sum,count] 
[avg(UnscaledValue(c_acctbal)),avg(c_acctbal),sum,count]
-                                InputAdapter
-                                  Exchange #3
-                                    WholeStageCodegen (1)
-                                      HashAggregate [c_acctbal] 
[sum,count,sum,count]
-                                        Project [c_acctbal]
-                                          Filter [c_acctbal,c_phone]
-                                            ColumnarToRow
-                                              InputAdapter
-                                                Scan parquet 
spark_catalog.default.customer [c_phone,c_acctbal]
+                          ReusedSubquery [avg(c_acctbal)] #1
                           ColumnarToRow
                             InputAdapter
                               Scan parquet spark_catalog.default.customer 
[c_custkey,c_phone,c_acctbal]
+                                Subquery #1
+                                  WholeStageCodegen (2)
+                                    HashAggregate [sum,count] 
[avg(UnscaledValue(c_acctbal)),avg(c_acctbal),sum,count]
+                                      InputAdapter
+                                        Exchange #3
+                                          WholeStageCodegen (1)
+                                            HashAggregate [c_acctbal] 
[sum,count,sum,count]
+                                              Project [c_acctbal]
+                                                Filter [c_acctbal,c_phone]
+                                                  ColumnarToRow
+                                                    InputAdapter
+                                                      Scan parquet 
spark_catalog.default.customer [c_phone,c_acctbal]
                         InputAdapter
                           BroadcastExchange #4
                             WholeStageCodegen (1)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
index f33432ddb6f..555679edad1 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
@@ -1370,7 +1370,8 @@ abstract class DynamicPartitionPruningSuiteBase
         }
 
         assert(subqueryIds.size == 2, "Whole plan subquery reusing not working 
correctly")
-        assert(reusedSubqueryIds.size == 1, "Whole plan subquery reusing not 
working correctly")
+        assert(reusedSubqueryIds.distinct.size == 1,
+          "Whole plan subquery reusing not working correctly")
         assert(reusedSubqueryIds.forall(subqueryIds.contains(_)),
           "ReusedSubqueryExec should reuse an existing subquery")
       }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
index d235d2a15fe..65c766af19a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
@@ -1572,18 +1572,6 @@ class SubquerySuite extends QueryTest
     }
   }
 
-  test("SPARK-25482: Forbid pushdown to datasources of filters containing 
subqueries") {
-    withTempView("t1", "t2") {
-      sql("create temporary view t1(a int) using parquet")
-      sql("create temporary view t2(b int) using parquet")
-      val plan = sql("select * from t2 where b > (select max(a) from t1)")
-      val subqueries = stripAQEPlan(plan.queryExecution.executedPlan).collect {
-        case p => p.subqueries
-      }.flatten
-      assert(subqueries.length == 1)
-    }
-  }
-
   test("SPARK-26893: Allow pushdown of partition pruning subquery filters to 
file source") {
     withTable("a", "b") {
       spark.range(4).selectExpr("id", "id % 2 AS 
p").write.partitionBy("p").saveAsTable("a")
@@ -2712,4 +2700,37 @@ class SubquerySuite extends QueryTest
         expected)
     }
   }
+
+  test("SPARK-43402: FileSourceScanExec supports push down data filter with 
scalar subquery") {
+    def checkFileSourceScan(query: String, answer: Seq[Row]): Unit = {
+      val df = sql(query)
+      checkAnswer(df, answer)
+      val fileSourceScanExec = collect(df.queryExecution.executedPlan) {
+        case f: FileSourceScanExec => f
+      }
+      sparkContext.listenerBus.waitUntilEmpty()
+      assert(fileSourceScanExec.size === 1)
+      val scalarSubquery = 
fileSourceScanExec.head.dataFilters.flatMap(_.collect {
+        case s: ScalarSubquery => s
+      })
+      assert(scalarSubquery.length === 1)
+      assert(scalarSubquery.head.plan.isInstanceOf[ReusedSubqueryExec])
+      assert(fileSourceScanExec.head.metrics("numFiles").value === 1)
+      assert(fileSourceScanExec.head.metrics("numOutputRows").value === 
answer.size)
+    }
+
+    withTable("t1", "t2") {
+      withSQLConf(SQLConf.LEAF_NODE_DEFAULT_PARALLELISM.key -> "1") {
+        Seq(1, 2, 3).toDF("c1").write.format("parquet").saveAsTable("t1")
+        Seq(4, 5, 6).toDF("c2").write.format("parquet").saveAsTable("t2")
+
+        checkFileSourceScan(
+          "SELECT * FROM t1 WHERE c1 > (SELECT min(c2) FROM t2)",
+          Seq.empty)
+        checkFileSourceScan(
+          "SELECT * FROM t1 WHERE c1 < (SELECT min(c2) FROM t2)",
+          Row(1) :: Row(2) :: Row(3) :: Nil)
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to