Repository: spark Updated Branches: refs/heads/master 2402b9146 -> 5b08ee639
[SPARK-15671] performance regression CoalesceRDD.pickBin with large #⦠I was running a 15TB join job with 202000 partitions. It looks like the changes I made to CoalesceRDD in pickBin() are really slow with that large of partitions. The array filter with that many elements just takes to long. It took about an hour for it to pickBins for all the partitions. original change: https://github.com/apache/spark/commit/83ee92f60345f016a390d61a82f1d924f64ddf90 Just reverting the pickBin code back to get currpreflocs fixes the issue After reverting the pickBin code the coalesce takes about 10 seconds so for now it makes sense to revert those changes and we can look at further optimizations later. Tested this via RDDSuite unit test and manually testing the very large job. Author: Thomas Graves <tgra...@prevailsail.corp.gq1.yahoo.com> Closes #13443 from tgravescs/SPARK-15671. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5b08ee63 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5b08ee63 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5b08ee63 Branch: refs/heads/master Commit: 5b08ee6396aeb4e0aa6139892a27186813c90931 Parents: 2402b91 Author: Thomas Graves <tgra...@prevailsail.corp.gq1.yahoo.com> Authored: Wed Jun 1 13:21:40 2016 -0700 Committer: Davies Liu <davies....@gmail.com> Committed: Wed Jun 1 13:21:40 2016 -0700 ---------------------------------------------------------------------- .../main/scala/org/apache/spark/rdd/CoalescedRDD.scala | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/5b08ee63/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala index c19ed15..2ec9846 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala @@ -169,6 +169,11 @@ private class DefaultPartitionCoalescer(val balanceSlack: Double = 0.10) var noLocality = true // if true if no preferredLocations exists for parent RDD + // gets the *current* preferred locations from the DAGScheduler (as opposed to the static ones) + def currPrefLocs(part: Partition, prev: RDD[_]): Seq[String] = { + prev.context.getPreferredLocs(prev, part.index).map(tl => tl.host) + } + class PartitionLocations(prev: RDD[_]) { // contains all the partitions from the previous RDD that don't have preferred locations @@ -184,7 +189,7 @@ private class DefaultPartitionCoalescer(val balanceSlack: Double = 0.10) val tmpPartsWithLocs = mutable.LinkedHashMap[Partition, Seq[String]]() // first get the locations for each partition, only do this once since it can be expensive prev.partitions.foreach(p => { - val locs = prev.context.getPreferredLocs(prev, p.index).map(tl => tl.host) + val locs = currPrefLocs(p, prev) if (locs.size > 0) { tmpPartsWithLocs.put(p, locs) } else { @@ -287,9 +292,8 @@ private class DefaultPartitionCoalescer(val balanceSlack: Double = 0.10) balanceSlack: Double, partitionLocs: PartitionLocations): PartitionGroup = { val slack = (balanceSlack * prev.partitions.length).toInt - val preflocs = partitionLocs.partsWithLocs.filter(_._2 == p).map(_._1).toSeq // least loaded pref locs - val pref = preflocs.map(getLeastGroupHash(_)).sortWith(compare) // least loaded pref locs + val pref = currPrefLocs(p, prev).map(getLeastGroupHash(_)).sortWith(compare) val prefPart = if (pref == Nil) None else pref.head val r1 = rnd.nextInt(groupArr.size) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org