[ https://issues.apache.org/jira/browse/SPARK-18067?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15808507#comment-15808507 ]
Tejas Patil commented on SPARK-18067: ------------------------------------- [~hvanhovell] : * You are right about the data distribution. Both approaches are prone to OOMs but my approach is more likely to OOM based on the data. If the tables are bucketed + sorted on a single column, both approaches will distribute the data in same manner. When I checked at Hive behavior, its doing the same thing that I mentioned and it works across all our workloads. * Adding shuffle makes the job to be less performant in general irrespective of data distribution. I assume that "significant performance problems" will only happen if the data is heavily skewed on the join key ? * I have seen OOMs happen with current spark approach (which hashes everything). I personally feel that the row buffering should be backed by disk spill to be more reliable. It will run a bit slower but at least will be reliable and not page people in the middle of the night due to OOM. I have seen some other scenario in SPARK-19122 but the proposed solution there will help this issue as well. > SortMergeJoin adds shuffle if join predicates have non partitioned columns > -------------------------------------------------------------------------- > > Key: SPARK-18067 > URL: https://issues.apache.org/jira/browse/SPARK-18067 > Project: Spark > Issue Type: Sub-task > Components: SQL > Affects Versions: 1.6.1 > Reporter: Paul Jones > Priority: Minor > > Basic setup > {code} > scala> case class Data1(key: String, value1: Int) > scala> case class Data2(key: String, value2: Int) > scala> val partition1 = sc.parallelize(1 to 100000).map(x => Data1(s"$x", x)) > .toDF.repartition(col("key")).sortWithinPartitions(col("key")).cache > scala> val partition2 = sc.parallelize(1 to 100000).map(x => Data2(s"$x", x)) > .toDF.repartition(col("key")).sortWithinPartitions(col("key")).cache > {code} > Join on key > {code} > scala> partition1.join(partition2, "key").explain > == Physical Plan == > Project [key#0,value1#1,value2#13] > +- SortMergeJoin [key#0], [key#12] > :- InMemoryColumnarTableScan [key#0,value1#1], InMemoryRelation > [key#0,value1#1], true, 10000, StorageLevel(true, true, false, true, 1), Sort > [key#0 ASC], false, 0, None > +- InMemoryColumnarTableScan [key#12,value2#13], InMemoryRelation > [key#12,value2#13], true, 10000, StorageLevel(true, true, false, true, 1), > Sort [key#12 ASC], false, 0, None > {code} > And we get a super efficient join with no shuffle. > But if we add a filter our join gets less efficient and we end up with a > shuffle. > {code} > scala> partition1.join(partition2, "key").filter($"value1" === > $"value2").explain > == Physical Plan == > Project [key#0,value1#1,value2#13] > +- SortMergeJoin [value1#1,key#0], [value2#13,key#12] > :- Sort [value1#1 ASC,key#0 ASC], false, 0 > : +- TungstenExchange hashpartitioning(value1#1,key#0,200), None > : +- InMemoryColumnarTableScan [key#0,value1#1], InMemoryRelation > [key#0,value1#1], true, 10000, StorageLevel(true, true, false, true, 1), Sort > [key#0 ASC], false, 0, None > +- Sort [value2#13 ASC,key#12 ASC], false, 0 > +- TungstenExchange hashpartitioning(value2#13,key#12,200), None > +- InMemoryColumnarTableScan [key#12,value2#13], InMemoryRelation > [key#12,value2#13], true, 10000, StorageLevel(true, true, false, true, 1), > Sort [key#12 ASC], false, 0, None > {code} > And we can avoid the shuffle if use a filter statement that can't be pushed > in the join. > {code} > scala> partition1.join(partition2, "key").filter($"value1" >= > $"value2").explain > == Physical Plan == > Project [key#0,value1#1,value2#13] > +- Filter (value1#1 >= value2#13) > +- SortMergeJoin [key#0], [key#12] > :- InMemoryColumnarTableScan [key#0,value1#1], InMemoryRelation > [key#0,value1#1], true, 10000, StorageLevel(true, true, false, true, 1), Sort > [key#0 ASC], false, 0, None > +- InMemoryColumnarTableScan [key#12,value2#13], InMemoryRelation > [key#12,value2#13], true, 10000, StorageLevel(true, true, false, true, 1), > Sort [key#12 ASC], false, 0, None > {code} > What's the best way to avoid the filter pushdown here?? -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org