[
https://issues.apache.org/jira/browse/SPARK-18067?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15600824#comment-15600824
]
Tejas Patil edited comment on SPARK-18067 at 10/24/16 3:35 AM:
---------------------------------------------------------------
[~hvanhovell] : Tagging you since you have context of this portion of the code.
I feel that createPartitioning()'s use of `HashPartitioning` is hurting here.
`HashPartitioning` has stricter semantics and feels like we could have
something else.
More explanation:
Both children are hash partitioned on `key`. Assume these are the partitions
for the children:
||partitions||child 1||child 2||
|partition 0|[0, 0, 0, 3]|[0, 0, 3, 3]|
|partition 1|[1, 4, 4]|[4]|
|partition 2|[2, 2]|[2, 5, 5, 5]|
Since we have __all__ the same values of `key` in a given partition, we can
evaluate other join predicates like (`value1` = `value2`) right there without
needing any shuffle.
What is currently being done i.e. `HashPartitioning(key, value)` expects rows
with same value of `pmod( hash(key, value))` to be in the same partition and
does not take advantage of the fact that we already have rows with same `key`
packed together.
was (Author: tejasp):
[~hvanhovell] : Tagging you since you have context of this portion of the code.
I feel that createPartitioning()'s use of `HashPartitioning` is hurting here.
`HashPartitioning` has stricter semantics and feels like we could have
something else.
More explanation:
Both children are hash partitioned on `key`. Assume these are the partitions
for the children:
||partitions||child 1||child 2||
|partition 0|[0, 0, 0, 3]|[0, 0, 3, 3]|
|partition 1|[1, 4, 4]|[4]|
|partition 2|[2, 2]|[2, 5, 5, 5]|
If we have *all* the same values of `key` in a given partition, then we can
evaluate other join predicates like (`value1` = `value2`) right there without
needing a shuffle.
What is currently being done i.e. `HashPartitioning(key, value)` expects rows
with same value of `pmod( hash(key, value))` to be in the same partition and
does not take advantage of the fact that we already have rows with same `key`
packed together.
> Adding filter after SortMergeJoin creates unnecessary shuffle
> -------------------------------------------------------------
>
> Key: SPARK-18067
> URL: https://issues.apache.org/jira/browse/SPARK-18067
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 1.6.1
> Reporter: Paul Jones
> Priority: Minor
>
> Basic setup
> {code}
> scala> case class Data1(key: String, value1: Int)
> scala> case class Data2(key: String, value2: Int)
> scala> val partition1 = sc.parallelize(1 to 100000).map(x => Data1(s"$x", x))
> .toDF.repartition(col("key")).sortWithinPartitions(col("key")).cache
> scala> val partition2 = sc.parallelize(1 to 100000).map(x => Data2(s"$x", x))
> .toDF.repartition(col("key")).sortWithinPartitions(col("key")).cache
> {code}
> Join on key
> {code}
> scala> partition1.join(partition2, "key").explain
> == Physical Plan ==
> Project [key#0,value1#1,value2#13]
> +- SortMergeJoin [key#0], [key#12]
> :- InMemoryColumnarTableScan [key#0,value1#1], InMemoryRelation
> [key#0,value1#1], true, 10000, StorageLevel(true, true, false, true, 1), Sort
> [key#0 ASC], false, 0, None
> +- InMemoryColumnarTableScan [key#12,value2#13], InMemoryRelation
> [key#12,value2#13], true, 10000, StorageLevel(true, true, false, true, 1),
> Sort [key#12 ASC], false, 0, None
> {code}
> And we get a super efficient join with no shuffle.
> But if we add a filter our join gets less efficient and we end up with a
> shuffle.
> {code}
> scala> partition1.join(partition2, "key").filter($"value1" ===
> $"value2").explain
> == Physical Plan ==
> Project [key#0,value1#1,value2#13]
> +- SortMergeJoin [value1#1,key#0], [value2#13,key#12]
> :- Sort [value1#1 ASC,key#0 ASC], false, 0
> : +- TungstenExchange hashpartitioning(value1#1,key#0,200), None
> : +- InMemoryColumnarTableScan [key#0,value1#1], InMemoryRelation
> [key#0,value1#1], true, 10000, StorageLevel(true, true, false, true, 1), Sort
> [key#0 ASC], false, 0, None
> +- Sort [value2#13 ASC,key#12 ASC], false, 0
> +- TungstenExchange hashpartitioning(value2#13,key#12,200), None
> +- InMemoryColumnarTableScan [key#12,value2#13], InMemoryRelation
> [key#12,value2#13], true, 10000, StorageLevel(true, true, false, true, 1),
> Sort [key#12 ASC], false, 0, None
> {code}
> And we can avoid the shuffle if use a filter statement that can't be pushed
> in the join.
> {code}
> scala> partition1.join(partition2, "key").filter($"value1" >=
> $"value2").explain
> == Physical Plan ==
> Project [key#0,value1#1,value2#13]
> +- Filter (value1#1 >= value2#13)
> +- SortMergeJoin [key#0], [key#12]
> :- InMemoryColumnarTableScan [key#0,value1#1], InMemoryRelation
> [key#0,value1#1], true, 10000, StorageLevel(true, true, false, true, 1), Sort
> [key#0 ASC], false, 0, None
> +- InMemoryColumnarTableScan [key#12,value2#13], InMemoryRelation
> [key#12,value2#13], true, 10000, StorageLevel(true, true, false, true, 1),
> Sort [key#12 ASC], false, 0, None
> {code}
> What's the best way to avoid the filter pushdown here??
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]