[ 
https://issues.apache.org/jira/browse/SPARK-8836?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Reynold Xin closed SPARK-8836.
------------------------------
    Resolution: Done

Marking this as done. Users can get it from both Dataset and DataFrame and SQL 
now.


> Sorted join
> -----------
>
>                 Key: SPARK-8836
>                 URL: https://issues.apache.org/jira/browse/SPARK-8836
>             Project: Spark
>          Issue Type: Improvement
>          Components: Spark Core
>    Affects Versions: 1.4.0
>            Reporter: Daniel Darabos
>            Priority: Minor
>
> In my [Spark Summit 2015 
> presentation|https://spark-summit.org/2015/events/interactive-graph-analytics-with-spark/]
>  I touted sorted joins. It would be a shame to talk about how great they are 
> and then not try to introduce them into Spark.
> When joining co-partitioned RDDs, the current Spark implementation builds a 
> map of the contents of one partition and looks up the items from the other 
> partition. 
> (https://github.com/apache/spark/blob/v1.4.0/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
>  using AppendOnlyMap.)
> Another option for lining up the keys from the two partitions is to sort them 
> both and then merge. Just doing this may already be a performance improvement.
> But what we do is we sort the partitions up-front, and then enjoy the 
> benefits over many operations. Our joins are 10x faster than normal Spark 
> joins and don't trigger GC. The hash-based join builds a large hashmap (the 
> size of the partition) while the sorted join does not allocate any memory. 
> The sorted partitions also benefit other operations, such as distinct, where 
> we also avoid building a hashmap. (I think the logic is similar to sort-based 
> shuffle, just at a later stage of the process.)
> Our implementation is based on zipPartitions, and this is entirely workable. 
> We have a custom RDD subclass (SortedRDD) and it overrides a bunch of 
> methods. We have an implicit class that adds a toSortedRDD method on 
> pair-RDDs.
> But I think integrating this into Spark could take it a step further. What we 
> have not investigated is cases where the sorting could be skipped. For 
> example when an RDD came out of a sort-based shuffle, its partitions will be 
> sorted, right? So even if the user never asks for the partitions to be 
> sorted, they can become so, and the faster sorted implementations of join, 
> distinct, etc could kick in automatically. This would speed up applications 
> without any change in their code.
> Instead of a subclass it would probably be best to do this with a simple 
> "hasSortedPartitions" variable in the RDD. Then perhaps operations could have 
> a "preservesPartitionOrder" parameter, like it is done with "partitioner" and 
> "preservesPartitioning" now. (For example filter(), mapValues(), join(), and 
> distinct() all keep the partition sorted.)
> What do you think about all this?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to