[ https://issues.apache.org/jira/browse/SPARK-18067?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15600824#comment-15600824 ]
Tejas Patil edited comment on SPARK-18067 at 10/24/16 3:35 AM: --------------------------------------------------------------- [~hvanhovell] : Tagging you since you have context of this portion of the code. I feel that createPartitioning()'s use of `HashPartitioning` is hurting here. `HashPartitioning` has stricter semantics and feels like we could have something else. More explanation: Both children are hash partitioned on `key`. Assume these are the partitions for the children: ||partitions||child 1||child 2|| |partition 0|[0, 0, 0, 3]|[0, 0, 3, 3]| |partition 1|[1, 4, 4]|[4]| |partition 2|[2, 2]|[2, 5, 5, 5]| Since we have __all__ the same values of `key` in a given partition, we can evaluate other join predicates like (`value1` = `value2`) right there without needing any shuffle. What is currently being done i.e. `HashPartitioning(key, value)` expects rows with same value of `pmod( hash(key, value))` to be in the same partition and does not take advantage of the fact that we already have rows with same `key` packed together. was (Author: tejasp): [~hvanhovell] : Tagging you since you have context of this portion of the code. I feel that createPartitioning()'s use of `HashPartitioning` is hurting here. `HashPartitioning` has stricter semantics and feels like we could have something else. More explanation: Both children are hash partitioned on `key`. Assume these are the partitions for the children: ||partitions||child 1||child 2|| |partition 0|[0, 0, 0, 3]|[0, 0, 3, 3]| |partition 1|[1, 4, 4]|[4]| |partition 2|[2, 2]|[2, 5, 5, 5]| If we have *all* the same values of `key` in a given partition, then we can evaluate other join predicates like (`value1` = `value2`) right there without needing a shuffle. What is currently being done i.e. `HashPartitioning(key, value)` expects rows with same value of `pmod( hash(key, value))` to be in the same partition and does not take advantage of the fact that we already have rows with same `key` packed together. > Adding filter after SortMergeJoin creates unnecessary shuffle > ------------------------------------------------------------- > > Key: SPARK-18067 > URL: https://issues.apache.org/jira/browse/SPARK-18067 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 1.6.1 > Reporter: Paul Jones > Priority: Minor > > Basic setup > {code} > scala> case class Data1(key: String, value1: Int) > scala> case class Data2(key: String, value2: Int) > scala> val partition1 = sc.parallelize(1 to 100000).map(x => Data1(s"$x", x)) > .toDF.repartition(col("key")).sortWithinPartitions(col("key")).cache > scala> val partition2 = sc.parallelize(1 to 100000).map(x => Data2(s"$x", x)) > .toDF.repartition(col("key")).sortWithinPartitions(col("key")).cache > {code} > Join on key > {code} > scala> partition1.join(partition2, "key").explain > == Physical Plan == > Project [key#0,value1#1,value2#13] > +- SortMergeJoin [key#0], [key#12] > :- InMemoryColumnarTableScan [key#0,value1#1], InMemoryRelation > [key#0,value1#1], true, 10000, StorageLevel(true, true, false, true, 1), Sort > [key#0 ASC], false, 0, None > +- InMemoryColumnarTableScan [key#12,value2#13], InMemoryRelation > [key#12,value2#13], true, 10000, StorageLevel(true, true, false, true, 1), > Sort [key#12 ASC], false, 0, None > {code} > And we get a super efficient join with no shuffle. > But if we add a filter our join gets less efficient and we end up with a > shuffle. > {code} > scala> partition1.join(partition2, "key").filter($"value1" === > $"value2").explain > == Physical Plan == > Project [key#0,value1#1,value2#13] > +- SortMergeJoin [value1#1,key#0], [value2#13,key#12] > :- Sort [value1#1 ASC,key#0 ASC], false, 0 > : +- TungstenExchange hashpartitioning(value1#1,key#0,200), None > : +- InMemoryColumnarTableScan [key#0,value1#1], InMemoryRelation > [key#0,value1#1], true, 10000, StorageLevel(true, true, false, true, 1), Sort > [key#0 ASC], false, 0, None > +- Sort [value2#13 ASC,key#12 ASC], false, 0 > +- TungstenExchange hashpartitioning(value2#13,key#12,200), None > +- InMemoryColumnarTableScan [key#12,value2#13], InMemoryRelation > [key#12,value2#13], true, 10000, StorageLevel(true, true, false, true, 1), > Sort [key#12 ASC], false, 0, None > {code} > And we can avoid the shuffle if use a filter statement that can't be pushed > in the join. > {code} > scala> partition1.join(partition2, "key").filter($"value1" >= > $"value2").explain > == Physical Plan == > Project [key#0,value1#1,value2#13] > +- Filter (value1#1 >= value2#13) > +- SortMergeJoin [key#0], [key#12] > :- InMemoryColumnarTableScan [key#0,value1#1], InMemoryRelation > [key#0,value1#1], true, 10000, StorageLevel(true, true, false, true, 1), Sort > [key#0 ASC], false, 0, None > +- InMemoryColumnarTableScan [key#12,value2#13], InMemoryRelation > [key#12,value2#13], true, 10000, StorageLevel(true, true, false, true, 1), > Sort [key#12 ASC], false, 0, None > {code} > What's the best way to avoid the filter pushdown here?? -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org