[ 
https://issues.apache.org/jira/browse/SPARK-18067?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16352594#comment-16352594
 ] 

Eyal Farago commented on SPARK-18067:
-------------------------------------

[~tejasp], this issue + associated pull-request seems to relax the distribution 
requirements of the top-level join,

I have an opposite case where the join has one key but one of its children uses 
two keys (including the join key) for aggregation.

 

spark 2.1 plans this (poorly?) with two exchange operators, one for the 
aggregate based on both keys and another for the join (based on one key).

 

I think that in this case the planner could relax distribution requirements of 
the aggregation (child) and rely on the fact the distributing by one key 
implies distributing by both keys (under the limitations that data might get 
skewed, we may need to spill during aggregation, etc...).

 

few questions:
 # is there any specific reason your approach is limited to joins rather that 
general use cases of 'stacked' exchange operators?
 # do you think there's room for a second strategy relaxing the distribution 
requirements of the inner exchange (aggregate in my case).

I'm also attaching an example plan I'm currently getting (slightly modified):
{noformat}
== Physical Plan ==
*Project [v1 AS v1#472, v2#311L AS v2#473L, v3#312L AS v3#474L, key1#355L AS 
k1#475L, key2#385 AS key2#477]
+- *SortMergeJoin [key1#355L], [key1#304L], Inner
  :- *Sort [key1#355L ASC NULLS FIRST], false, 0
  :  +- Exchange hashpartitioning(key1#355L, 8)
  :     +- *HashAggregate(keys=[key1#355L, key2#385], functions=[], 
output=[key1#355L, key2#385])
  :        +- Exchange hashpartitioning(key1#355L, key2#385, 8)
  :           +- *HashAggregate(keys=[key1#355L, key2#385], functions=[], 
output=[key1#355L, key2#385])
  :              +- *Project [key1#355L, UDF(v4#365) AS key2#385]
  :                 +- *Filter ((isnotnull(key1#355L) && NOT (key1#355L = -1)) 
&& v4#365 IN (s1,s2,s3,s4))
  :                    +- *FileScan parquet [key1#355L,v4#365,commitUUID#366] 
Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:    
mp/loads_only_samples_in_comparison_list5387414497663000634/008d9ea9-a0c..., 
PartitionCount: 1, PartitionFilters: [commitUUID#366 IN 
(oldCommit,commit-uuid)], PushedFilters: [IsNotNull(key1), 
Not(EqualTo(key1,-1)), In(v4, [s,s2,s3..., ReadSchema: 
struct<key1:bigint,v4:string>
  +- *Sort [key1#304L ASC NULLS FIRST], false, 0
     +- Exchange hashpartitioning(key1#304L, 8)
        +- *Project [key1#304L, v1, v2#311L, v3#312L]
{noformat}

notice the two exchanges in the join's first child.

> SortMergeJoin adds shuffle if join predicates have non partitioned columns
> --------------------------------------------------------------------------
>
>                 Key: SPARK-18067
>                 URL: https://issues.apache.org/jira/browse/SPARK-18067
>             Project: Spark
>          Issue Type: Sub-task
>          Components: SQL
>    Affects Versions: 1.6.1
>            Reporter: Paul Jones
>            Priority: Minor
>
> Basic setup
> {code}
> scala> case class Data1(key: String, value1: Int)
> scala> case class Data2(key: String, value2: Int)
> scala> val partition1 = sc.parallelize(1 to 100000).map(x => Data1(s"$x", x))
>     .toDF.repartition(col("key")).sortWithinPartitions(col("key")).cache
> scala> val partition2 = sc.parallelize(1 to 100000).map(x => Data2(s"$x", x))
>     .toDF.repartition(col("key")).sortWithinPartitions(col("key")).cache
> {code}
> Join on key
> {code}
> scala> partition1.join(partition2, "key").explain
> == Physical Plan ==
> Project [key#0,value1#1,value2#13]
> +- SortMergeJoin [key#0], [key#12]
>    :- InMemoryColumnarTableScan [key#0,value1#1], InMemoryRelation 
> [key#0,value1#1], true, 10000, StorageLevel(true, true, false, true, 1), Sort 
> [key#0 ASC], false, 0, None
>    +- InMemoryColumnarTableScan [key#12,value2#13], InMemoryRelation 
> [key#12,value2#13], true, 10000, StorageLevel(true, true, false, true, 1), 
> Sort [key#12 ASC], false, 0, None
> {code}
> And we get a super efficient join with no shuffle.
> But if we add a filter our join gets less efficient and we end up with a 
> shuffle.
> {code}
> scala> partition1.join(partition2, "key").filter($"value1" === 
> $"value2").explain
> == Physical Plan ==
> Project [key#0,value1#1,value2#13]
> +- SortMergeJoin [value1#1,key#0], [value2#13,key#12]
>    :- Sort [value1#1 ASC,key#0 ASC], false, 0
>    :  +- TungstenExchange hashpartitioning(value1#1,key#0,200), None
>    :     +- InMemoryColumnarTableScan [key#0,value1#1], InMemoryRelation 
> [key#0,value1#1], true, 10000, StorageLevel(true, true, false, true, 1), Sort 
> [key#0 ASC], false, 0, None
>    +- Sort [value2#13 ASC,key#12 ASC], false, 0
>       +- TungstenExchange hashpartitioning(value2#13,key#12,200), None
>          +- InMemoryColumnarTableScan [key#12,value2#13], InMemoryRelation 
> [key#12,value2#13], true, 10000, StorageLevel(true, true, false, true, 1), 
> Sort [key#12 ASC], false, 0, None
> {code}
> And we can avoid the shuffle if use a filter statement that can't be pushed 
> in the join.
> {code}
> scala> partition1.join(partition2, "key").filter($"value1" >= 
> $"value2").explain
> == Physical Plan ==
> Project [key#0,value1#1,value2#13]
> +- Filter (value1#1 >= value2#13)
>    +- SortMergeJoin [key#0], [key#12]
>       :- InMemoryColumnarTableScan [key#0,value1#1], InMemoryRelation 
> [key#0,value1#1], true, 10000, StorageLevel(true, true, false, true, 1), Sort 
> [key#0 ASC], false, 0, None
>       +- InMemoryColumnarTableScan [key#12,value2#13], InMemoryRelation 
> [key#12,value2#13], true, 10000, StorageLevel(true, true, false, true, 1), 
> Sort [key#12 ASC], false, 0, None
> {code}
> What's the best way to avoid the filter pushdown here??



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to