[ 
https://issues.apache.org/jira/browse/SPARK-20313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16148463#comment-16148463
 ] 

Tejas Patil commented on SPARK-20313:
-------------------------------------

I tried to replicate what you shared in the jira but dont see anything wrong 
with what planner is doing. Comparing both the approaches, `SortMergeJoin` is 
always being picked. The second approach does joins over individual partitions 
one by one and then unions the results. Depending on your data size + configs, 
it might be possible that for your case a hash based join was used which would 
explain why the later approach is faster.

Approach #1
{noformat}
val df1 = hc.sql("SELECT * FROM 
bucketed_partitioned_1").filter(functions.col("ds").between("1", "5"))
val df2 = hc.sql("SELECT * FROM 
bucketed_partitioned_2").filter(functions.col("ds").between("1", "5"))
val df3 = df1.join(df2, Seq("ds", "user_id")).explain(true)

== Physical Plan ==
*Project [ds#38, user_id#36, name#37, name#45]
+- *SortMergeJoin [ds#38, user_id#36], [ds#46, user_id#44], [ds#38, 
user_id#36], [ds#46, user_id#44], Inner
   :- *Sort [ds#38 ASC NULLS FIRST, user_id#36 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(ds#38, user_id#36, 200)
   :     +- *Filter isnotnull(user_id#36)
   :        +- HiveTableScan [user_id#36, name#37, ds#38], HiveTableRelation 
`default`.`bucketed_partitioned_1`, 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [user_id#36, name#37], 
[ds#38], [isnotnull(ds#38), (ds#38 >= 1), (ds#38 <= 5)]
   +- *Sort [ds#46 ASC NULLS FIRST, user_id#44 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(ds#46, user_id#44, 200)
         +- *Filter isnotnull(user_id#44)
            +- HiveTableScan [user_id#44, name#45, ds#46], HiveTableRelation 
`default`.`bucketed_partitioned_2`, 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [user_id#44, name#45], 
[ds#46], [isnotnull(ds#46), (ds#46 >= 1), (ds#46 <= 5)]
{noformat}

Approach #2
{noformat}
val df1 = hc.sql("SELECT * FROM bucketed_partitioned_1")
val df2 = hc.sql("SELECT * FROM bucketed_partitioned_2")
val dsValues = Seq("1111-11-11", "4444-44-44")
val df3 = dsValues.map(dsValue => {
    val df1filtered = df1.filter(functions.col("ds") === dsValue)
    val df2filtered = df2.filter(functions.col("ds") === dsValue)
    df1filtered.join(df2filtered, Seq("user_id")) // part1 removed from join
}).reduce(_ union _)


== Physical Plan ==
Union
:- *Project [user_id#63, name#64, ds#65, name#71, ds#72]
:  +- *SortMergeJoin [user_id#63], [user_id#70], [user_id#63], [user_id#70], 
Inner
:     :- *Sort [user_id#63 ASC NULLS FIRST], false, 0
:     :  +- Exchange hashpartitioning(user_id#63, 200)
:     :     +- *Filter isnotnull(user_id#63)
:     :        +- HiveTableScan [user_id#63, name#64, ds#65], HiveTableRelation 
`default`.`bucketed_partitioned_1`, 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [user_id#63, name#64], 
[ds#65], [isnotnull(ds#65), (ds#65 = 1111-11-11)]
:     +- *Sort [user_id#70 ASC NULLS FIRST], false, 0
:        +- Exchange hashpartitioning(user_id#70, 200)
:           +- *Filter isnotnull(user_id#70)
:              +- HiveTableScan [user_id#70, name#71, ds#72], HiveTableRelation 
`default`.`bucketed_partitioned_2`, 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [user_id#70, name#71], 
[ds#72], [isnotnull(ds#72), (ds#72 = 1111-11-11)]
+- *Project [user_id#63, name#64, ds#65, name#71, ds#72]
   +- *SortMergeJoin [user_id#63], [user_id#70], [user_id#63], [user_id#70], 
Inner
      :- *Sort [user_id#63 ASC NULLS FIRST], false, 0
      :  +- Exchange hashpartitioning(user_id#63, 200)
      :     +- *Filter isnotnull(user_id#63)
      :        +- HiveTableScan [user_id#63, name#64, ds#65], HiveTableRelation 
`default`.`bucketed_partitioned_1`, 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [user_id#63, name#64], 
[ds#65], [isnotnull(ds#65), (ds#65 = 4444-44-44)]
      +- *Sort [user_id#70 ASC NULLS FIRST], false, 0
         +- Exchange hashpartitioning(user_id#70, 200)
            +- *Filter isnotnull(user_id#70)
               +- HiveTableScan [user_id#70, name#71, ds#72], HiveTableRelation 
`default`.`bucketed_partitioned_2`, 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [user_id#70, name#71], 
[ds#72], [isnotnull(ds#72), (ds#72 = 4444-44-44)]
{noformat} 

> Possible lack of join optimization when partitions are in the join condition
> ----------------------------------------------------------------------------
>
>                 Key: SPARK-20313
>                 URL: https://issues.apache.org/jira/browse/SPARK-20313
>             Project: Spark
>          Issue Type: Improvement
>          Components: Optimizer
>    Affects Versions: 2.1.0
>            Reporter: Albert Meltzer
>
> Given two tables T1 and T2, partitioned on column part1, the following have 
> vastly different execution performance:
> // initial, slow
> {noformat}
> val df1 = // load data from T1
>   .filter(functions.col("part1").between("val1", "val2")
> val df2 = // load data from T2
>   .filter(functions.col("part1").between("val1", "val2")
> val df3 = df1.join(df2, Seq("part1", "col1"))
> {noformat}
> // manually optimized, considerably faster
> {noformat}
> val df1 = // load data from T1
> val df2 = // load data from T2
> val part1values = Seq(...) // a collection of values between val1 and val2
> val df3 = part1values
>   .map(part1value => {
>     val df1filtered = df1.filter(functions.col("part1") === part1value)
>     val df2filtered = df2.filter(functions.col("part1") === part1value)
>     df1filtered.join(df2filtered, Seq("col1")) // part1 removed from join
>   })
>   .reduce(_ union _)
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to