AngersZhuuuu edited a comment on pull request #31485:
URL: https://github.com/apache/spark/pull/31485#issuecomment-774843305


   > ```sql
   > create table t1 using parquet as select id as a, id as b, id as c from 
range(1000000);
   > create table t2 using parquet as select id as a, id as b, id as c from 
range(1000000);
   > create table t3 using parquet as select id as a, id as b, id as c from 
range(1000000);
   > analyze table t1 compute statistics for all columns;
   > analyze table t2 compute statistics for all columns;
   > analyze table t3 compute statistics for all columns;
   > 
   > set spark.sql.cbo.enabled=true;
   > explain cost
   > select * from t3 where c > (select max(t1.c) as tc from t1 join t2 on t1.a 
= t2.a and t2.b < 10);
   > ```
   
   
   After current pr the physic plan is 
   ```
   == Physical Plan ==
   AdaptiveSparkPlan isFinalPlan=false
   +- Filter (isnotnull(c#9787L) AND (c#9787L > Subquery subquery#9784, 
[id=#246]))
      :  +- Subquery subquery#9784, [id=#246]
      :     +- AdaptiveSparkPlan isFinalPlan=false
      :        +- HashAggregate(keys=[], functions=[max(c#9790L)], 
output=[tc#9783L])
      :           +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#244]
      :              +- HashAggregate(keys=[], 
functions=[partial_max(c#9790L)], output=[max#9799L])
      :                 +- Project [c#9790L]
      :                    +- BroadcastHashJoin [a#9788L], [a#9791L], Inner, 
BuildRight, false
      :                       :- Filter isnotnull(a#9788L)
      :                       :  +- FileScan parquet 
default.t1[a#9788L,c#9790L] Batched: true, DataFilters: [isnotnull(a#9788L)], 
Format: Parquet, Location: InMemoryFileIndex(1 
paths)[file:/Users/yi.zhu/Documents/project/Angerszhuuuu/spark/spark-warehous...,
 PartitionFilters: [], PushedFilters: [IsNotNull(a)], ReadSchema: 
struct<a:bigint,c:bigint>
      :                       +- BroadcastExchange 
HashedRelationBroadcastMode(List(input[0, bigint, true]),false), [id=#239]
      :                          +- Project [a#9791L]
      :                             +- Filter ((isnotnull(b#9792L) AND (b#9792L 
< 10)) AND isnotnull(a#9791L))
      :                                +- FileScan parquet 
default.t2[a#9791L,b#9792L] Batched: true, DataFilters: [isnotnull(b#9792L), 
(b#9792L < 10), isnotnull(a#9791L)], Format: Parquet, Location: 
InMemoryFileIndex(1 
paths)[file:/Users/yi.zhu/Documents/project/Angerszhuuuu/spark/spark-warehous...,
 PartitionFilters: [], PushedFilters: [IsNotNull(b), LessThan(b,10), 
IsNotNull(a)], ReadSchema: struct<a:bigint,b:bigint>
      +- FileScan parquet default.t3[a#9785L,b#9786L,c#9787L] Batched: true, 
DataFilters: [isnotnull(c#9787L)], Format: Parquet, Location: 
InMemoryFileIndex(1 
paths)[file:/Users/yi.zhu/Documents/project/Angerszhuuuu/spark/spark-warehous...,
 PartitionFilters: [], PushedFilters: [IsNotNull(c)], ReadSchema: 
struct<a:bigint,b:bigint,c:bigint>
   ```
   
   Change broadcast threshold
   ```
   set spark.sql.autoBroadcastJoinThreshold=10
   ```
   the plan will be 
   ```
   
   == Physical Plan ==
   AdaptiveSparkPlan isFinalPlan=false
   +- Filter (isnotnull(c#9797L) AND (c#9797L > Subquery subquery#9794, 
[id=#253]))
      :  +- Subquery subquery#9794, [id=#253]
      :     +- AdaptiveSparkPlan isFinalPlan=false
      :        +- HashAggregate(keys=[], functions=[max(c#9800L)])
      :           +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#251]
      :              +- HashAggregate(keys=[], functions=[partial_max(c#9800L)])
      :                 +- Project [c#9800L]
      :                    +- SortMergeJoin [a#9798L], [a#9801L], Inner
      :                       :- Sort [a#9798L ASC NULLS FIRST], false, 0
      :                       :  +- Exchange hashpartitioning(a#9798L, 5), 
ENSURE_REQUIREMENTS, [id=#243]
      :                       :     +- Filter isnotnull(a#9798L)
      :                       :        +- FileScan parquet 
default.t1[a#9798L,c#9800L] Batched: true, DataFilters: [isnotnull(a#9798L)], 
Format: Parquet, Location: InMemoryFileIndex(1 
paths)[file:/Users/yi.zhu/Documents/project/Angerszhuuuu/spark/spark-warehous...,
 PartitionFilters: [], PushedFilters: [IsNotNull(a)], ReadSchema: 
struct<a:bigint,c:bigint>
      :                       +- Sort [a#9801L ASC NULLS FIRST], false, 0
      :                          +- Exchange hashpartitioning(a#9801L, 5), 
ENSURE_REQUIREMENTS, [id=#244]
      :                             +- Project [a#9801L]
      :                                +- Filter ((isnotnull(b#9802L) AND 
(b#9802L < 10)) AND isnotnull(a#9801L))
      :                                   +- FileScan parquet 
default.t2[a#9801L,b#9802L] Batched: true, DataFilters: [isnotnull(b#9802L), 
(b#9802L < 10), isnotnull(a#9801L)], Format: Parquet, Location: 
InMemoryFileIndex(1 
paths)[file:/Users/yi.zhu/Documents/project/Angerszhuuuu/spark/spark-warehous...,
 PartitionFilters: [], PushedFilters: [IsNotNull(b), LessThan(b,10), 
IsNotNull(a)], ReadSchema: struct<a:bigint,b:bigint>
      +- FileScan parquet default.t3[a#9795L,b#9796L,c#9797L] Batched: true, 
DataFilters: [isnotnull(c#9797L)], Format: Parquet, Location: 
InMemoryFileIndex(1 
paths)[file:/Users/yi.zhu/Documents/project/Angerszhuuuu/spark/spark-warehous...,
 PartitionFilters: [], PushedFilters: [IsNotNull(c)], ReadSchema: 
struct<a:bigint,b:bigint,c:bigint>
   ```
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to