Repository: spark
Updated Branches:
  refs/heads/master 2402b9146 -> 5b08ee639


[SPARK-15671] performance regression CoalesceRDD.pickBin with large #…

I was running a 15TB join job with 202000 partitions. It looks like the changes 
I made to CoalesceRDD in pickBin() are really slow with that large of 
partitions. The array filter with that many elements just takes to long.
It took about an hour for it to pickBins for all the partitions.
original change:
https://github.com/apache/spark/commit/83ee92f60345f016a390d61a82f1d924f64ddf90

Just reverting the pickBin code back to get currpreflocs fixes the issue

After reverting the pickBin code the coalesce takes about 10 seconds so for now 
it makes sense to revert those changes and we can look at further optimizations 
later.

Tested this via RDDSuite unit test and manually testing the very large job.

Author: Thomas Graves <tgra...@prevailsail.corp.gq1.yahoo.com>

Closes #13443 from tgravescs/SPARK-15671.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5b08ee63
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5b08ee63
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5b08ee63

Branch: refs/heads/master
Commit: 5b08ee6396aeb4e0aa6139892a27186813c90931
Parents: 2402b91
Author: Thomas Graves <tgra...@prevailsail.corp.gq1.yahoo.com>
Authored: Wed Jun 1 13:21:40 2016 -0700
Committer: Davies Liu <davies....@gmail.com>
Committed: Wed Jun 1 13:21:40 2016 -0700

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/rdd/CoalescedRDD.scala    | 10 +++++++---
 1 file changed, 7 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/5b08ee63/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
index c19ed15..2ec9846 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
@@ -169,6 +169,11 @@ private class DefaultPartitionCoalescer(val balanceSlack: 
Double = 0.10)
 
   var noLocality = true  // if true if no preferredLocations exists for parent 
RDD
 
+  // gets the *current* preferred locations from the DAGScheduler (as opposed 
to the static ones)
+  def currPrefLocs(part: Partition, prev: RDD[_]): Seq[String] = {
+    prev.context.getPreferredLocs(prev, part.index).map(tl => tl.host)
+  }
+
   class PartitionLocations(prev: RDD[_]) {
 
     // contains all the partitions from the previous RDD that don't have 
preferred locations
@@ -184,7 +189,7 @@ private class DefaultPartitionCoalescer(val balanceSlack: 
Double = 0.10)
       val tmpPartsWithLocs = mutable.LinkedHashMap[Partition, Seq[String]]()
       // first get the locations for each partition, only do this once since 
it can be expensive
       prev.partitions.foreach(p => {
-          val locs = prev.context.getPreferredLocs(prev, p.index).map(tl => 
tl.host)
+          val locs = currPrefLocs(p, prev)
           if (locs.size > 0) {
             tmpPartsWithLocs.put(p, locs)
           } else {
@@ -287,9 +292,8 @@ private class DefaultPartitionCoalescer(val balanceSlack: 
Double = 0.10)
       balanceSlack: Double,
       partitionLocs: PartitionLocations): PartitionGroup = {
     val slack = (balanceSlack * prev.partitions.length).toInt
-    val preflocs = partitionLocs.partsWithLocs.filter(_._2 == 
p).map(_._1).toSeq
     // least loaded pref locs
-    val pref = preflocs.map(getLeastGroupHash(_)).sortWith(compare) // least 
loaded pref locs
+    val pref = currPrefLocs(p, 
prev).map(getLeastGroupHash(_)).sortWith(compare)
     val prefPart = if (pref == Nil) None else pref.head
 
     val r1 = rnd.nextInt(groupArr.size)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to