[ https://issues.apache.org/jira/browse/SPARK-8836?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Reynold Xin closed SPARK-8836. ------------------------------ Resolution: Done Marking this as done. Users can get it from both Dataset and DataFrame and SQL now. > Sorted join > ----------- > > Key: SPARK-8836 > URL: https://issues.apache.org/jira/browse/SPARK-8836 > Project: Spark > Issue Type: Improvement > Components: Spark Core > Affects Versions: 1.4.0 > Reporter: Daniel Darabos > Priority: Minor > > In my [Spark Summit 2015 > presentation|https://spark-summit.org/2015/events/interactive-graph-analytics-with-spark/] > I touted sorted joins. It would be a shame to talk about how great they are > and then not try to introduce them into Spark. > When joining co-partitioned RDDs, the current Spark implementation builds a > map of the contents of one partition and looks up the items from the other > partition. > (https://github.com/apache/spark/blob/v1.4.0/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala > using AppendOnlyMap.) > Another option for lining up the keys from the two partitions is to sort them > both and then merge. Just doing this may already be a performance improvement. > But what we do is we sort the partitions up-front, and then enjoy the > benefits over many operations. Our joins are 10x faster than normal Spark > joins and don't trigger GC. The hash-based join builds a large hashmap (the > size of the partition) while the sorted join does not allocate any memory. > The sorted partitions also benefit other operations, such as distinct, where > we also avoid building a hashmap. (I think the logic is similar to sort-based > shuffle, just at a later stage of the process.) > Our implementation is based on zipPartitions, and this is entirely workable. > We have a custom RDD subclass (SortedRDD) and it overrides a bunch of > methods. We have an implicit class that adds a toSortedRDD method on > pair-RDDs. > But I think integrating this into Spark could take it a step further. What we > have not investigated is cases where the sorting could be skipped. For > example when an RDD came out of a sort-based shuffle, its partitions will be > sorted, right? So even if the user never asks for the partitions to be > sorted, they can become so, and the faster sorted implementations of join, > distinct, etc could kick in automatically. This would speed up applications > without any change in their code. > Instead of a subclass it would probably be best to do this with a simple > "hasSortedPartitions" variable in the RDD. Then perhaps operations could have > a "preservesPartitionOrder" parameter, like it is done with "partitioner" and > "preservesPartitioning" now. (For example filter(), mapValues(), join(), and > distinct() all keep the partition sorted.) > What do you think about all this? -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org