[ https://issues.apache.org/jira/browse/SPARK-26959?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Natang updated SPARK-26959: --------------------------- Affects Version/s: 2.4.0 > Join of two tables, bucketed the same way, on bucket columns and one or more > other coulmns should not need a shuffle > -------------------------------------------------------------------------------------------------------------------- > > Key: SPARK-26959 > URL: https://issues.apache.org/jira/browse/SPARK-26959 > Project: Spark > Issue Type: Improvement > Components: SQL > Affects Versions: 2.2.1, 2.4.0 > Reporter: Natang > Priority: Major > > _When two tables, that are bucketed the same way, are joined using bucket > columns and one or more other columns, Spark should be able to perform the > join without doing a shuffle._ > Consider the example below. There are two tables, 'join_left_table' and > 'join_right_table', bucketed by 'col1' into 4 buckets. When these tables are > joined on 'col1' and 'col2', Spark should be able to do the join without > having to do a shuffle. All entries for a give value of 'col1' would be in > the same bucket for both the tables, irrespective of values of 'col2'. > > ---- > > > {noformat} > def randomInt1to100 = scala.util.Random.nextInt(100)+1 > val left = sc.parallelize( > Seq.fill(100){(randomInt1to100,randomInt1to100,randomInt1to100)} > ).toDF("col1", "col2", "col3") > val right = sc.parallelize( > Seq.fill(100){(randomInt1to100,randomInt1to100,randomInt1to100)} > ).toDF("col1", "col2", "col3") > import org.apache.spark.sql.SaveMode > left.write > .bucketBy(4,"col1") > .sortBy("col1", "col2") > .mode(SaveMode.Overwrite) > .saveAsTable("join_left_table") > > right.write > .bucketBy(4,"col1") > .sortBy("col1", "col2") > .mode(SaveMode.Overwrite) > .saveAsTable("join_right_table") > val left_table = spark.read.table("join_left_table") > val right_table = spark.read.table("join_right_table") > spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1) > val join_on_col1=left_table.join( > right_table, > Seq("col1")) > join_on_col1.explain > ### BEGIN Output > join_on_col1: org.apache.spark.sql.DataFrame = [col1: int, col2: int ... 3 > more fields] > == Physical Plan == > *Project [col1#250, col2#251, col3#252, col2#258, col3#259] > +- *SortMergeJoin [col1#250], [col1#257], Inner > :- *Sort [col1#250 ASC NULLS FIRST], false, 0 > : +- *Project [col1#250, col2#251, col3#252] > : +- *Filter isnotnull(col1#250) > : +- *FileScan parquet > default.join_left_table[col1#250,col2#251,col3#252] Batched: true, Format: > Parquet, Location: > InMemoryFileIndex[hdfs://ip-10-0-108-205.ec2.internal:8020/user/spark/warehouse/join_left_table], > PartitionFilters: [], PushedFilters: [IsNotNull(col1)], ReadSchema: > struct<col1:int,col2:int,col3:int> > +- *Sort [col1#257 ASC NULLS FIRST], false, 0 > +- *Project [col1#257, col2#258, col3#259] > +- *Filter isnotnull(col1#257) > +- *FileScan parquet > default.join_right_table[col1#257,col2#258,col3#259] Batched: true, Format: > Parquet, Location: > InMemoryFileIndex[hdfs://ip-10-0-108-205.ec2.internal:8020/user/spark/warehouse/join_right_table], > PartitionFilters: [], PushedFilters: [IsNotNull(col1)], ReadSchema: > struct<col1:int,col2:int,col3:int> > ### END Output > val join_on_col1_col2=left_table.join( > right_table, > Seq("col1","col2")) > join_on_col1_col2.explain > ### BEGIN Output > join_on_col1_col2: org.apache.spark.sql.DataFrame = [col1: int, col2: int ... > 2 more fields] > == Physical Plan == > *Project [col1#250, col2#251, col3#252, col3#259] > +- *SortMergeJoin [col1#250, col2#251], [col1#257, col2#258], Inner > :- *Sort [col1#250 ASC NULLS FIRST, col2#251 ASC NULLS FIRST], false, 0 > : +- Exchange hashpartitioning(col1#250, col2#251, 200) > : +- *Project [col1#250, col2#251, col3#252] > : +- *Filter (isnotnull(col2#251) && isnotnull(col1#250)) > : +- *FileScan parquet > default.join_left_table[col1#250,col2#251,col3#252] Batched: true, Format: > Parquet, Location: > InMemoryFileIndex[hdfs://ip-10-0-108-205.ec2.internal:8020/user/spark/warehouse/join_left_table], > PartitionFilters: [], PushedFilters: [IsNotNull(col2), IsNotNull(col1)], > ReadSchema: struct<col1:int,col2:int,col3:int> > +- *Sort [col1#257 ASC NULLS FIRST, col2#258 ASC NULLS FIRST], false, 0 > +- Exchange hashpartitioning(col1#257, col2#258, 200) > +- *Project [col1#257, col2#258, col3#259] > +- *Filter (isnotnull(col2#258) && isnotnull(col1#257)) > +- *FileScan parquet > default.join_right_table[col1#257,col2#258,col3#259] Batched: true, Format: > Parquet, Location: > InMemoryFileIndex[hdfs://ip-10-0-108-205.ec2.internal:8020/user/spark/warehouse/join_right_table], > PartitionFilters: [], PushedFilters: [IsNotNull(col2), IsNotNull(col1)], > ReadSchema: struct<col1:int,col2:int,col3:int> > ### END Output{noformat} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org