[jira] [Assigned] (SPARK-26576) Broadcast hint not applied to partitioned table

2019-01-10 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26576?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-26576:


Assignee: Apache Spark

> Broadcast hint not applied to partitioned table
> ---
>
> Key: SPARK-26576
> URL: https://issues.apache.org/jira/browse/SPARK-26576
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.2, 2.3.2, 2.4.0
>Reporter: John Zhuge
>Assignee: Apache Spark
>Priority: Major
>
> Broadcast hint is not applied to partitioned Parquet table. Below 
> "SortMergeJoin" is chosen incorrectly and "ResolvedHit(broadcast)" is removed 
> in Optimized Plan.
> {noformat}
> scala> spark.sql("CREATE TABLE jzhuge.parquet_with_part (val STRING) 
> PARTITIONED BY (dateint INT) STORED AS parquet")
> scala> spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")
> scala> Seq(spark.table("jzhuge.parquet_with_part")).map(df => 
> df.join(broadcast(df), "dateint").explain(true))
> == Parsed Logical Plan ==
> 'Join UsingJoin(Inner,List(dateint))
> :- SubqueryAlias `jzhuge`.`parquet_with_part`
> :  +- Relation[val#28,dateint#29] parquet
> +- ResolvedHint (broadcast)
>+- SubqueryAlias `jzhuge`.`parquet_with_part`
>   +- Relation[val#32,dateint#33] parquet
> == Analyzed Logical Plan ==
> dateint: int, val: string, val: string
> Project [dateint#29, val#28, val#32]
> +- Join Inner, (dateint#29 = dateint#33)
>:- SubqueryAlias `jzhuge`.`parquet_with_part`
>:  +- Relation[val#28,dateint#29] parquet
>+- ResolvedHint (broadcast)
>   +- SubqueryAlias `jzhuge`.`parquet_with_part`
>  +- Relation[val#32,dateint#33] parquet
> == Optimized Logical Plan ==
> Project [dateint#29, val#28, val#32]
> +- Join Inner, (dateint#29 = dateint#33)
>:- Project [val#28, dateint#29]
>:  +- Filter isnotnull(dateint#29)
>: +- Relation[val#28,dateint#29] parquet
>+- Project [val#32, dateint#33]
>   +- Filter isnotnull(dateint#33)
>  +- Relation[val#32,dateint#33] parquet
> == Physical Plan ==
> *(5) Project [dateint#29, val#28, val#32]
> +- *(5) SortMergeJoin [dateint#29], [dateint#33], Inner
>:- *(2) Sort [dateint#29 ASC NULLS FIRST], false, 0
>:  +- Exchange(coordinator id: 55629191) hashpartitioning(dateint#29, 
> 500), coordinator[target post-shuffle partition size: 67108864]
>: +- *(1) FileScan parquet jzhuge.parquet_with_part[val#28,dateint#29] 
> Batched: true, Format: Parquet, Location: PrunedInMemoryFileIndex[], 
> PartitionCount: 0, PartitionFilters: [isnotnull(dateint#29)], PushedFilters: 
> [], ReadSchema: struct
>+- *(4) Sort [dateint#33 ASC NULLS FIRST], false, 0
>   +- ReusedExchange [val#32, dateint#33], Exchange(coordinator id: 
> 55629191) hashpartitioning(dateint#29, 500), coordinator[target post-shuffle 
> partition size: 67108864]
> {noformat}
> Broadcast hint is applied to Parquet table without partition. Below 
> "BroadcastHashJoin" is chosen as expected.
> {noformat}
> scala> spark.sql("CREATE TABLE jzhuge.parquet_no_part (val STRING, dateint 
> INT) STORED AS parquet")
> scala> spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")
> scala> Seq(spark.table("jzhuge.parquet_no_part")).map(df => 
> df.join(broadcast(df), "dateint").explain(true))
> == Parsed Logical Plan ==
> 'Join UsingJoin(Inner,List(dateint))
> :- SubqueryAlias `jzhuge`.`parquet_no_part`
> :  +- Relation[val#44,dateint#45] parquet
> +- ResolvedHint (broadcast)
>+- SubqueryAlias `jzhuge`.`parquet_no_part`
>   +- Relation[val#50,dateint#51] parquet
> == Analyzed Logical Plan ==
> dateint: int, val: string, val: string
> Project [dateint#45, val#44, val#50]
> +- Join Inner, (dateint#45 = dateint#51)
>:- SubqueryAlias `jzhuge`.`parquet_no_part`
>:  +- Relation[val#44,dateint#45] parquet
>+- ResolvedHint (broadcast)
>   +- SubqueryAlias `jzhuge`.`parquet_no_part`
>  +- Relation[val#50,dateint#51] parquet
> == Optimized Logical Plan ==
> Project [dateint#45, val#44, val#50]
> +- Join Inner, (dateint#45 = dateint#51)
>:- Filter isnotnull(dateint#45)
>:  +- Relation[val#44,dateint#45] parquet
>+- ResolvedHint (broadcast)
>   +- Filter isnotnull(dateint#51)
>  +- Relation[val#50,dateint#51] parquet
> == Physical Plan ==
> *(2) Project [dateint#45, val#44, val#50]
> +- *(2) BroadcastHashJoin [dateint#45], [dateint#51], Inner, BuildRight
>:- *(2) Project [val#44, dateint#45]
>:  +- *(2) Filter isnotnull(dateint#45)
>: +- *(2) FileScan parquet jzhuge.parquet_no_part[val#44,dateint#45] 
> Batched: true, Format: Parquet, Location: InMemoryFileIndex[...], 
> PartitionFilters: [], PushedFilters: [IsNotNull(dateint)], ReadSchema: 
> struct
>+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[1, int, 
> true] as 

[jira] [Assigned] (SPARK-26576) Broadcast hint not applied to partitioned table

2019-01-10 Thread Apache Spark (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26576?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-26576:


Assignee: (was: Apache Spark)

> Broadcast hint not applied to partitioned table
> ---
>
> Key: SPARK-26576
> URL: https://issues.apache.org/jira/browse/SPARK-26576
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.2.2, 2.3.2, 2.4.0
>Reporter: John Zhuge
>Priority: Major
>
> Broadcast hint is not applied to partitioned Parquet table. Below 
> "SortMergeJoin" is chosen incorrectly and "ResolvedHit(broadcast)" is removed 
> in Optimized Plan.
> {noformat}
> scala> spark.sql("CREATE TABLE jzhuge.parquet_with_part (val STRING) 
> PARTITIONED BY (dateint INT) STORED AS parquet")
> scala> spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")
> scala> Seq(spark.table("jzhuge.parquet_with_part")).map(df => 
> df.join(broadcast(df), "dateint").explain(true))
> == Parsed Logical Plan ==
> 'Join UsingJoin(Inner,List(dateint))
> :- SubqueryAlias `jzhuge`.`parquet_with_part`
> :  +- Relation[val#28,dateint#29] parquet
> +- ResolvedHint (broadcast)
>+- SubqueryAlias `jzhuge`.`parquet_with_part`
>   +- Relation[val#32,dateint#33] parquet
> == Analyzed Logical Plan ==
> dateint: int, val: string, val: string
> Project [dateint#29, val#28, val#32]
> +- Join Inner, (dateint#29 = dateint#33)
>:- SubqueryAlias `jzhuge`.`parquet_with_part`
>:  +- Relation[val#28,dateint#29] parquet
>+- ResolvedHint (broadcast)
>   +- SubqueryAlias `jzhuge`.`parquet_with_part`
>  +- Relation[val#32,dateint#33] parquet
> == Optimized Logical Plan ==
> Project [dateint#29, val#28, val#32]
> +- Join Inner, (dateint#29 = dateint#33)
>:- Project [val#28, dateint#29]
>:  +- Filter isnotnull(dateint#29)
>: +- Relation[val#28,dateint#29] parquet
>+- Project [val#32, dateint#33]
>   +- Filter isnotnull(dateint#33)
>  +- Relation[val#32,dateint#33] parquet
> == Physical Plan ==
> *(5) Project [dateint#29, val#28, val#32]
> +- *(5) SortMergeJoin [dateint#29], [dateint#33], Inner
>:- *(2) Sort [dateint#29 ASC NULLS FIRST], false, 0
>:  +- Exchange(coordinator id: 55629191) hashpartitioning(dateint#29, 
> 500), coordinator[target post-shuffle partition size: 67108864]
>: +- *(1) FileScan parquet jzhuge.parquet_with_part[val#28,dateint#29] 
> Batched: true, Format: Parquet, Location: PrunedInMemoryFileIndex[], 
> PartitionCount: 0, PartitionFilters: [isnotnull(dateint#29)], PushedFilters: 
> [], ReadSchema: struct
>+- *(4) Sort [dateint#33 ASC NULLS FIRST], false, 0
>   +- ReusedExchange [val#32, dateint#33], Exchange(coordinator id: 
> 55629191) hashpartitioning(dateint#29, 500), coordinator[target post-shuffle 
> partition size: 67108864]
> {noformat}
> Broadcast hint is applied to Parquet table without partition. Below 
> "BroadcastHashJoin" is chosen as expected.
> {noformat}
> scala> spark.sql("CREATE TABLE jzhuge.parquet_no_part (val STRING, dateint 
> INT) STORED AS parquet")
> scala> spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")
> scala> Seq(spark.table("jzhuge.parquet_no_part")).map(df => 
> df.join(broadcast(df), "dateint").explain(true))
> == Parsed Logical Plan ==
> 'Join UsingJoin(Inner,List(dateint))
> :- SubqueryAlias `jzhuge`.`parquet_no_part`
> :  +- Relation[val#44,dateint#45] parquet
> +- ResolvedHint (broadcast)
>+- SubqueryAlias `jzhuge`.`parquet_no_part`
>   +- Relation[val#50,dateint#51] parquet
> == Analyzed Logical Plan ==
> dateint: int, val: string, val: string
> Project [dateint#45, val#44, val#50]
> +- Join Inner, (dateint#45 = dateint#51)
>:- SubqueryAlias `jzhuge`.`parquet_no_part`
>:  +- Relation[val#44,dateint#45] parquet
>+- ResolvedHint (broadcast)
>   +- SubqueryAlias `jzhuge`.`parquet_no_part`
>  +- Relation[val#50,dateint#51] parquet
> == Optimized Logical Plan ==
> Project [dateint#45, val#44, val#50]
> +- Join Inner, (dateint#45 = dateint#51)
>:- Filter isnotnull(dateint#45)
>:  +- Relation[val#44,dateint#45] parquet
>+- ResolvedHint (broadcast)
>   +- Filter isnotnull(dateint#51)
>  +- Relation[val#50,dateint#51] parquet
> == Physical Plan ==
> *(2) Project [dateint#45, val#44, val#50]
> +- *(2) BroadcastHashJoin [dateint#45], [dateint#51], Inner, BuildRight
>:- *(2) Project [val#44, dateint#45]
>:  +- *(2) Filter isnotnull(dateint#45)
>: +- *(2) FileScan parquet jzhuge.parquet_no_part[val#44,dateint#45] 
> Batched: true, Format: Parquet, Location: InMemoryFileIndex[...], 
> PartitionFilters: [], PushedFilters: [IsNotNull(dateint)], ReadSchema: 
> struct
>+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[1, int, 
> true] as bigint)))
>   +- *(1)