This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 21db433 [SPARK-27070] Fix performance bug in DefaultPartitionCoalescer 21db433 is described below commit 21db4336b08fcb93779d72ebbb0251f3a2d08934 Author: fitermay <fiter...@gmail.com> AuthorDate: Thu Mar 14 20:13:18 2019 -0500 [SPARK-27070] Fix performance bug in DefaultPartitionCoalescer When trying to coalesce a UnionRDD of two large FileScanRDDs (each with a few million partitions) into around 8k partitions the driver can stall for over an hour. Profiler shows that over 90% of the time is spent in TimSort which is invoked by `pickBin`. This patch replaces sorting with a more efficient `min` for the purpose of finding the least occupied PartitionGroup Closes #23986 from fitermay/SPARK-27070. Authored-by: fitermay <fiter...@gmail.com> Signed-off-by: Sean Owen <sean.o...@databricks.com> --- core/benchmarks/CoalescedRDDBenchmark-results.txt | 40 +++++++++++ .../scala/org/apache/spark/rdd/CoalescedRDD.scala | 34 ++++----- .../apache/spark/rdd/CoalescedRDDBenchmark.scala | 80 ++++++++++++++++++++++ 3 files changed, 137 insertions(+), 17 deletions(-) diff --git a/core/benchmarks/CoalescedRDDBenchmark-results.txt b/core/benchmarks/CoalescedRDDBenchmark-results.txt new file mode 100644 index 0000000..dd63b0a --- /dev/null +++ b/core/benchmarks/CoalescedRDDBenchmark-results.txt @@ -0,0 +1,40 @@ +================================================================================================ +Coalesced RDD , large scale +================================================================================================ + +Java HotSpot(TM) 64-Bit Server VM 1.8.0_201-b09 on Windows 10 10.0 +Intel64 Family 6 Model 63 Stepping 2, GenuineIntel +Coalesced RDD: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------------------------------ +Coalesce Num Partitions: 100 Num Hosts: 1 346 364 24 0.3 3458.9 1.0X +Coalesce Num Partitions: 100 Num Hosts: 5 258 264 6 0.4 2579.0 1.3X +Coalesce Num Partitions: 100 Num Hosts: 10 242 249 7 0.4 2415.2 1.4X +Coalesce Num Partitions: 100 Num Hosts: 20 237 242 7 0.4 2371.7 1.5X +Coalesce Num Partitions: 100 Num Hosts: 40 230 231 1 0.4 2299.8 1.5X +Coalesce Num Partitions: 100 Num Hosts: 80 222 233 14 0.4 2223.0 1.6X +Coalesce Num Partitions: 500 Num Hosts: 1 659 665 5 0.2 6590.4 0.5X +Coalesce Num Partitions: 500 Num Hosts: 5 340 381 47 0.3 3395.2 1.0X +Coalesce Num Partitions: 500 Num Hosts: 10 279 307 47 0.4 2788.3 1.2X +Coalesce Num Partitions: 500 Num Hosts: 20 259 261 2 0.4 2591.9 1.3X +Coalesce Num Partitions: 500 Num Hosts: 40 241 250 15 0.4 2406.5 1.4X +Coalesce Num Partitions: 500 Num Hosts: 80 235 237 3 0.4 2349.9 1.5X +Coalesce Num Partitions: 1000 Num Hosts: 1 1050 1053 4 0.1 10503.2 0.3X +Coalesce Num Partitions: 1000 Num Hosts: 5 405 407 2 0.2 4049.5 0.9X +Coalesce Num Partitions: 1000 Num Hosts: 10 320 322 2 0.3 3202.7 1.1X +Coalesce Num Partitions: 1000 Num Hosts: 20 276 277 0 0.4 2762.3 1.3X +Coalesce Num Partitions: 1000 Num Hosts: 40 257 260 5 0.4 2571.2 1.3X +Coalesce Num Partitions: 1000 Num Hosts: 80 245 252 13 0.4 2448.9 1.4X +Coalesce Num Partitions: 5000 Num Hosts: 1 3099 3145 55 0.0 30988.6 0.1X +Coalesce Num Partitions: 5000 Num Hosts: 5 1037 1050 20 0.1 10374.4 0.3X +Coalesce Num Partitions: 5000 Num Hosts: 10 626 633 8 0.2 6261.8 0.6X +Coalesce Num Partitions: 5000 Num Hosts: 20 426 431 5 0.2 4258.6 0.8X +Coalesce Num Partitions: 5000 Num Hosts: 40 328 341 22 0.3 3275.4 1.1X +Coalesce Num Partitions: 5000 Num Hosts: 80 272 275 4 0.4 2721.4 1.3X +Coalesce Num Partitions: 10000 Num Hosts: 1 5516 5526 9 0.0 55156.8 0.1X +Coalesce Num Partitions: 10000 Num Hosts: 5 1956 1992 48 0.1 19560.9 0.2X +Coalesce Num Partitions: 10000 Num Hosts: 10 1045 1057 18 0.1 10447.4 0.3X +Coalesce Num Partitions: 10000 Num Hosts: 20 637 658 24 0.2 6373.2 0.5X +Coalesce Num Partitions: 10000 Num Hosts: 40 431 448 15 0.2 4312.9 0.8X +Coalesce Num Partitions: 10000 Num Hosts: 80 326 328 2 0.3 3263.4 1.1X + + diff --git a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala index 94e7d0b..e006f63 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala @@ -58,7 +58,7 @@ private[spark] case class CoalescedRDDPartition( val parentPreferredLocations = rdd.context.getPreferredLocs(rdd, p.index).map(_.host) preferredLocation.exists(parentPreferredLocations.contains) } - if (parents.size == 0) 0.0 else (loc.toDouble / parents.size.toDouble) + if (parents.isEmpty) 0.0 else loc.toDouble / parents.size.toDouble } } @@ -91,7 +91,7 @@ private[spark] class CoalescedRDD[T: ClassTag]( pc.coalesce(maxPartitions, prev).zipWithIndex.map { case (pg, i) => val ids = pg.partitions.map(_.index).toArray - new CoalescedRDDPartition(i, prev, ids, pg.prefLoc) + CoalescedRDDPartition(i, prev, ids, pg.prefLoc) } } @@ -116,7 +116,7 @@ private[spark] class CoalescedRDD[T: ClassTag]( /** * Returns the preferred machine for the partition. If split is of type CoalescedRDDPartition, * then the preferred machine will be one which most parent splits prefer too. - * @param partition + * @param partition the partition for which to retrieve the preferred machine, if exists * @return the machine most preferred by split */ override def getPreferredLocations(partition: Partition): Seq[String] = { @@ -156,9 +156,11 @@ private[spark] class CoalescedRDD[T: ClassTag]( private class DefaultPartitionCoalescer(val balanceSlack: Double = 0.10) extends PartitionCoalescer { - def compare(o1: PartitionGroup, o2: PartitionGroup): Boolean = o1.numPartitions < o2.numPartitions - def compare(o1: Option[PartitionGroup], o2: Option[PartitionGroup]): Boolean = - if (o1 == None) false else if (o2 == None) true else compare(o1.get, o2.get) + + implicit val partitionGroupOrdering: Ordering[PartitionGroup] = + (o1: PartitionGroup, o2: PartitionGroup) => + java.lang.Integer.compare(o1.numPartitions, o2.numPartitions) + val rnd = new scala.util.Random(7919) // keep this class deterministic @@ -178,7 +180,7 @@ private class DefaultPartitionCoalescer(val balanceSlack: Double = 0.10) prev.context.getPreferredLocs(prev, part.index).map(tl => tl.host) } - class PartitionLocations(prev: RDD[_]) { + private class PartitionLocations(prev: RDD[_]) { // contains all the partitions from the previous RDD that don't have preferred locations val partsWithoutLocs = ArrayBuffer[Partition]() @@ -213,15 +215,14 @@ private class DefaultPartitionCoalescer(val balanceSlack: Double = 0.10) } /** - * Sorts and gets the least element of the list associated with key in groupHash + * Gets the least element of the list associated with key in groupHash * The returned PartitionGroup is the least loaded of all groups that represent the machine "key" * * @param key string representing a partitioned group on preferred machine key * @return Option of [[PartitionGroup]] that has least elements for key */ - def getLeastGroupHash(key: String): Option[PartitionGroup] = { - groupHash.get(key).map(_.sortWith(compare).head) - } + def getLeastGroupHash(key: String): Option[PartitionGroup] = + groupHash.get(key).filter(_.nonEmpty).map(_.min) def addPartToPGroup(part: Partition, pgroup: PartitionGroup): Boolean = { if (!initialHash.contains(part)) { @@ -236,12 +237,12 @@ private class DefaultPartitionCoalescer(val balanceSlack: Double = 0.10) * is assigned a preferredLocation. This uses coupon collector to estimate how many * preferredLocations it must rotate through until it has seen most of the preferred * locations (2 * n log(n)) - * @param targetLen + * @param targetLen The number of desired partition groups */ def setupGroups(targetLen: Int, partitionLocs: PartitionLocations) { // deal with empty case, just create targetLen partition groups with no preferred location if (partitionLocs.partsWithLocs.isEmpty) { - (1 to targetLen).foreach(x => groupArr += new PartitionGroup()) + (1 to targetLen).foreach(_ => groupArr += new PartitionGroup()) return } @@ -297,9 +298,8 @@ private class DefaultPartitionCoalescer(val balanceSlack: Double = 0.10) partitionLocs: PartitionLocations): PartitionGroup = { val slack = (balanceSlack * prev.partitions.length).toInt // least loaded pref locs - val pref = currPrefLocs(p, prev).map(getLeastGroupHash(_)).sortWith(compare) - val prefPart = if (pref == Nil) None else pref.head - + val pref = currPrefLocs(p, prev).flatMap(getLeastGroupHash) + val prefPart = if (pref.isEmpty) None else Some(pref.min) val r1 = rnd.nextInt(groupArr.size) val r2 = rnd.nextInt(groupArr.size) val minPowerOfTwo = { @@ -351,7 +351,7 @@ private class DefaultPartitionCoalescer(val balanceSlack: Double = 0.10) val partIter = partitionLocs.partsWithLocs.iterator groupArr.filter(pg => pg.numPartitions == 0).foreach { pg => while (partIter.hasNext && pg.numPartitions == 0) { - var (nxt_replica, nxt_part) = partIter.next() + var (_, nxt_part) = partIter.next() if (!initialHash.contains(nxt_part)) { pg.partitions += nxt_part initialHash += nxt_part diff --git a/core/src/test/scala/org/apache/spark/rdd/CoalescedRDDBenchmark.scala b/core/src/test/scala/org/apache/spark/rdd/CoalescedRDDBenchmark.scala new file mode 100644 index 0000000..42b3070 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/rdd/CoalescedRDDBenchmark.scala @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rdd + +import scala.collection.immutable + +import org.apache.spark.SparkContext +import org.apache.spark.benchmark.{Benchmark, BenchmarkBase} + +/** + * Benchmark for CoalescedRDD. + * Measures rdd.coalesce performance under various combinations of + * coalesced partitions and preferred hosts + * To run this benchmark: + * {{{ + * 1. without sbt: + * bin/spark-submit --class <this class> --jars <spark core test jar> + * 2. build/sbt "core/test:runMain <this class>" + * 3. generate result: + * SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "core/test:runMain <this class>" + * Results will be written to "benchmarks/CoalescedRDD-results.txt". + * }}} + * */ +object CoalescedRDDBenchmark extends BenchmarkBase { + val seed = 0x1337 + val sc = new SparkContext(master = "local[4]", appName = "test") + + private def coalescedRDD(numIters: Int): Unit = { + val numBlocks = 100000 + val benchmark = new Benchmark("Coalesced RDD", numBlocks, output = output) + for (numPartitions <- Seq(100, 500, 1000, 5000, 10000)) { + for (numHosts <- Seq(1, 5, 10, 20, 40, 80)) { + + import collection.mutable + val hosts = mutable.ArrayBuffer[String]() + (1 to numHosts).foreach(hosts += "m" + _) + hosts.length + val rnd = scala.util.Random + rnd.setSeed(seed) + val blocks: immutable.Seq[(Int, Seq[String])] = (1 to numBlocks).map { i => + (i, hosts(rnd.nextInt(hosts.size)) :: Nil) + } + + benchmark.addCase( + s"Coalesce Num Partitions: $numPartitions Num Hosts: $numHosts", + numIters) { _ => + performCoalesce(blocks, numPartitions) + } + } + } + + benchmark.run() + } + + private def performCoalesce(blocks: immutable.Seq[(Int, Seq[String])], numPartitions: Int) { + sc.makeRDD(blocks).coalesce(numPartitions).partitions + } + + override def runBenchmarkSuite(mainArgs: Array[String]): Unit = { + val numIters = 3 + runBenchmark("Coalesced RDD , large scale") { + coalescedRDD(numIters) + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org