[ 
https://issues.apache.org/jira/browse/SPARK-25403?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan reassigned SPARK-25403:
-----------------------------------

    Assignee: Yuming Wang

> Broadcast join is changing to sort merge join , after spark-beeline session 
> restarts.
> -------------------------------------------------------------------------------------
>
>                 Key: SPARK-25403
>                 URL: https://issues.apache.org/jira/browse/SPARK-25403
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.2.1, 2.3.1
>         Environment: Spark 2.3.1
> Hadoop 2.7.2
>            Reporter: Ayush Anubhava
>            Assignee: Yuming Wang
>            Priority: Major
>
> *Issue 1*: {color:#ff0000}Broadcast join is changing to sort merge join , 
> after spark-beeline session restarts{color}.
> *Precondition* : The JDBC/Thrift Server is continuously running. 
> *Steps:*
>  
> {code:java}
> 0: jdbc:hive2://10.18.18.214:23040/default> use x1;
> +---------+--+
> | Result |
> +---------+--+
> +---------+--+
> 0: jdbc:hive2://10.18.18.214:23040/default> create table cv (a int, b string) 
> stored as parquet;
> +---------+--+
> | Result |
> +---------+--+
> +---------+--+
> 0: jdbc:hive2://10.18.18.214:23040/default> create table c (a int, b string) 
> stored as parquet;
> +---------+--+
> | Result |
> +---------+--+
> +---------+--+
> 0: jdbc:hive2://10.18.18.214:23040/default> insert into table c values 
> (1,'a');
> +---------+--+
> | Result |
> +---------+--+
> +---------+--+
> 0: jdbc:hive2://10.18.18.214:23040/default> insert into table cv values 
> (1,'a');
> +---------+--+
> | Result |
> +---------+--+
> +---------+--+
> 0: jdbc:hive2://10.18.18.214:23040/default> select * from c , cv where c.a = 
> cv.
> +----+----+----+----+--+
> | a | b | a | b |
> +----+----+----+----+--+
> | 1 | a | 1 | a |
> +----+----+----+----+--+
> {code}
>  
> {code:java}
> Before Restarting the session (spark-beeline)
> {code}
> *{color:#d04437}explain select * from c , cv where c.a = cv.a;{color}*
> |== Physical Plan ==
>  *(2) {color:#d04437}BroadcastHashJoin{color}[a#3284|#3284], [a#3286|#3286], 
> Inner, BuildRight
>  :- *(2) Project [a#3284, b#3285|#3284, b#3285]
>  : +- *(2) Filter isnotnull(a#3284)
>  : +- *(2) FileScan parquet x1.c[a#3284,b#3285|#3284,b#3285] Batched: true, 
> Format: Parquet, Location: 
> InMemoryFileIndex[hdfs://hacluster/user/sparkhive/warehouse/x1.db/c], 
> PartitionFilters: [], PushedFilters: [IsNotNull(a)], ReadSchema: 
> struct<a:int,b:string>
>  +- {color:#d04437}BroadcastExchange{color} 
> HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)))
>  +- *(1) Project [a#3286, b#3287|#3286, b#3287]
>  +- *(1) Filter isnotnull(a#3286)
>  +- *(1) FileScan parquet x1.cv[a#3286,b#3287|#3286,b#3287] Batched: true, 
> Format: Parquet, Location: 
> InMemoryFileIndex[hdfs://hacluster/user/sparkhive/warehouse/x1.db/cv], 
> PartitionFilters: [], PushedFilters: [IsNotNull(a)], ReadSchema: 
> struct<a:int,b:string>|
> {code:java}
> After Session Restarts (spark-beeline)
> {code}
> {color:#d04437} *explain select * from c , cv where c.a = cv.a;*{color}
> |== Physical Plan == *(5) *{color:#d04437}SortMergeJoin{color}* 
> [a#3312|#3312], [a#3314|#3314], Inner :- *(2) Sort [a#3312 ASC NULLS 
> FIRST|#3312 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(a#3312, 
> 200) : +- *(1) Project [a#3312, b#3313|#3312, b#3313] : +- *(1) Filter 
> isnotnull(a#3312) : +- *(1) FileScan parquet x1.c[a#3312,b#3313|#3312,b#3313] 
> Batched: true, Format: Parquet, Location: 
> InMemoryFileIndex[hdfs://hacluster/user/sparkhive/warehouse/x1.db/c], 
> PartitionFilters: [], PushedFilters: [IsNotNull(a)], ReadSchema: 
> struct<a:int,b:string> +- *(4) Sort [a#3314 ASC NULLS FIRST|#3314 ASC NULLS 
> FIRST], false, 0 +- Exchange hashpartitioning(a#3314, 200) +- *(3) Project 
> [a#3314, b#3315|#3314, b#3315] +- *(3) Filter isnotnull(a#3314) +- *(3) 
> FileScan parquet x1.cv[a#3314,b#3315|#3314,b#3315] Batched: true, Format: 
> Parquet, Location: 
> InMemoryFileIndex[hdfs://hacluster/user/sparkhive/warehouse/x1.db/cv], 
> PartitionFilters: [], PushedFilters: [IsNotNull(a)], ReadSchema: 
> struct<a:int,b:string>|
> +_*Note: JDBC Server is continuously running at the time of session restart 
> i.e. Application is not restarting. The driver remains the same.*_+



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to