[ 
https://issues.apache.org/jira/browse/SPARK-26959?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Natang updated SPARK-26959:
---------------------------
    Affects Version/s: 2.4.0

> Join of two tables, bucketed the same way, on bucket columns and one or more 
> other coulmns should not need a shuffle
> --------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-26959
>                 URL: https://issues.apache.org/jira/browse/SPARK-26959
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 2.2.1, 2.4.0
>            Reporter: Natang
>            Priority: Major
>
> _When two tables, that are bucketed the same way, are joined using bucket 
> columns and one or more other columns, Spark should be able to perform the 
> join without doing a shuffle._
> Consider the example below. There are two tables, 'join_left_table' and 
> 'join_right_table', bucketed by 'col1' into 4 buckets. When these tables are 
> joined on 'col1' and 'col2', Spark should be able to do the join without 
> having to do a shuffle. All entries for a give value of 'col1' would be in 
> the same bucket for both the tables, irrespective of values of 'col2'.
>  
> ----
>  
>  
> {noformat}
> def randomInt1to100 = scala.util.Random.nextInt(100)+1
> val left = sc.parallelize(
>   Seq.fill(100){(randomInt1to100,randomInt1to100,randomInt1to100)}
> ).toDF("col1", "col2", "col3")
> val right = sc.parallelize(
>   Seq.fill(100){(randomInt1to100,randomInt1to100,randomInt1to100)}
> ).toDF("col1", "col2", "col3")
> import org.apache.spark.sql.SaveMode
> left.write
>     .bucketBy(4,"col1")
>     .sortBy("col1", "col2")
>     .mode(SaveMode.Overwrite)
>     .saveAsTable("join_left_table")
>     
> right.write
>     .bucketBy(4,"col1")
>     .sortBy("col1", "col2")
>     .mode(SaveMode.Overwrite)
>     .saveAsTable("join_right_table")
> val left_table = spark.read.table("join_left_table")
> val right_table = spark.read.table("join_right_table")
> spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
> val join_on_col1=left_table.join(
>         right_table,
>         Seq("col1"))
> join_on_col1.explain
> ### BEGIN Output
> join_on_col1: org.apache.spark.sql.DataFrame = [col1: int, col2: int ... 3 
> more fields]
> == Physical Plan ==
> *Project [col1#250, col2#251, col3#252, col2#258, col3#259]
> +- *SortMergeJoin [col1#250], [col1#257], Inner
>    :- *Sort [col1#250 ASC NULLS FIRST], false, 0
>    :  +- *Project [col1#250, col2#251, col3#252]
>    :     +- *Filter isnotnull(col1#250)
>    :        +- *FileScan parquet 
> default.join_left_table[col1#250,col2#251,col3#252] Batched: true, Format: 
> Parquet, Location: 
> InMemoryFileIndex[hdfs://ip-10-0-108-205.ec2.internal:8020/user/spark/warehouse/join_left_table],
>  PartitionFilters: [], PushedFilters: [IsNotNull(col1)], ReadSchema: 
> struct<col1:int,col2:int,col3:int>
>    +- *Sort [col1#257 ASC NULLS FIRST], false, 0
>       +- *Project [col1#257, col2#258, col3#259]
>          +- *Filter isnotnull(col1#257)
>             +- *FileScan parquet 
> default.join_right_table[col1#257,col2#258,col3#259] Batched: true, Format: 
> Parquet, Location: 
> InMemoryFileIndex[hdfs://ip-10-0-108-205.ec2.internal:8020/user/spark/warehouse/join_right_table],
>  PartitionFilters: [], PushedFilters: [IsNotNull(col1)], ReadSchema: 
> struct<col1:int,col2:int,col3:int>
> ### END Output
> val join_on_col1_col2=left_table.join(
>         right_table,
>         Seq("col1","col2"))
> join_on_col1_col2.explain
> ### BEGIN Output
> join_on_col1_col2: org.apache.spark.sql.DataFrame = [col1: int, col2: int ... 
> 2 more fields]
> == Physical Plan ==
> *Project [col1#250, col2#251, col3#252, col3#259]
> +- *SortMergeJoin [col1#250, col2#251], [col1#257, col2#258], Inner
>    :- *Sort [col1#250 ASC NULLS FIRST, col2#251 ASC NULLS FIRST], false, 0
>    :  +- Exchange hashpartitioning(col1#250, col2#251, 200)
>    :     +- *Project [col1#250, col2#251, col3#252]
>    :        +- *Filter (isnotnull(col2#251) && isnotnull(col1#250))
>    :           +- *FileScan parquet 
> default.join_left_table[col1#250,col2#251,col3#252] Batched: true, Format: 
> Parquet, Location: 
> InMemoryFileIndex[hdfs://ip-10-0-108-205.ec2.internal:8020/user/spark/warehouse/join_left_table],
>  PartitionFilters: [], PushedFilters: [IsNotNull(col2), IsNotNull(col1)], 
> ReadSchema: struct<col1:int,col2:int,col3:int>
>    +- *Sort [col1#257 ASC NULLS FIRST, col2#258 ASC NULLS FIRST], false, 0
>       +- Exchange hashpartitioning(col1#257, col2#258, 200)
>          +- *Project [col1#257, col2#258, col3#259]
>             +- *Filter (isnotnull(col2#258) && isnotnull(col1#257))
>                +- *FileScan parquet 
> default.join_right_table[col1#257,col2#258,col3#259] Batched: true, Format: 
> Parquet, Location: 
> InMemoryFileIndex[hdfs://ip-10-0-108-205.ec2.internal:8020/user/spark/warehouse/join_right_table],
>  PartitionFilters: [], PushedFilters: [IsNotNull(col2), IsNotNull(col1)], 
> ReadSchema: struct<col1:int,col2:int,col3:int>
> ### END Output{noformat}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to