[ 
https://issues.apache.org/jira/browse/SPARK-17570?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15497729#comment-15497729
 ] 

Tejas Patil commented on SPARK-17570:
-------------------------------------

[~cloud_fan] , [~hvanhovell] : I have been looking into the code to figure out 
a way to make this change. So far I have gotten to this but I am not sure if 
this is the best way to do this:
1. Have a way to mutate the `outputPartitioning` for SparkPlan nodes. 
2. While creating the physical plan [0], when we come across a case when such 
optimization can be applied, mutate the child subtree's output partitioning 
from `HashPartitioning(expression, buckets = x * y)` to 
`HashPartitioning(expression, buckets = y)` where `y` is the desired buckets in 
the output table of Sort merge join.
3. When the bucketed RDD is created, take into account to pack multiple buckets 
of the input table into same `FilePartition`

#2 needs to visit and alter all the nodes of a subtree after the 
outputPartitioning for the root of the subtree is changed. Are you OK with this 
approach OR have better ideas ?

[0] : 
https://github.com/apache/spark/blob/aaf632b2132750c697dddd0469b902d9308dbf36/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala#L189
[1] : 
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala#L417

> Avoid Hash and Exchange in Sort Merge join if bucketing factor is multiple 
> for tables
> -------------------------------------------------------------------------------------
>
>                 Key: SPARK-17570
>                 URL: https://issues.apache.org/jira/browse/SPARK-17570
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.0.0
>            Reporter: Tejas Patil
>            Priority: Minor
>
> In case of bucketed tables, Spark will avoid doing `Sort` and `Exchange` if 
> the input tables and output table has same number of buckets. However, 
> unequal bucketing will always lead to `Sort` and `Exchange`. If the number of 
> buckets in the output table is a factor of the buckets in the input table, we 
> should be able to avoid `Sort` and `Exchange` and directly join those.
> eg.
> Assume Input1, Input2 and Output be bucketed + sorted tables over the same 
> columns but with different number of buckets. Input1 has 8 buckets, Input1 
> has 4 buckets and Output has 4 buckets. Since hash-partitioning is done using 
> Modulus, if we JOIN buckets (0, 4) of Input1 and buckets (0, 4, 8) of Input2 
> in the same task, it would give the bucket 0 of output table.
> {noformat}
> Input1   (0, 4)      (1, 3)      (2, 5)       (3, 7)
> Input2   (0, 4, 8)   (1, 3, 9)   (2, 5, 10)   (3, 7, 11)
> Output   (0)         (1)         (2)          (3)
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to