This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 1bc481b  [SPARK-27070] Improve performance of DefaultPartitionCoalescer
1bc481b is described below

commit 1bc481b77960cf69c2561722dc7ca419f1dc7967
Author: fitermay <fiter...@gmail.com>
AuthorDate: Sun Mar 17 11:47:14 2019 -0500

    [SPARK-27070] Improve performance of DefaultPartitionCoalescer
    
    This time tested against Scala 2.11 as well
    
    Closes #24116 from fitermay/master.
    
    Authored-by: fitermay <fiter...@gmail.com>
    Signed-off-by: Sean Owen <sean.o...@databricks.com>
---
 core/benchmarks/CoalescedRDDBenchmark-results.txt  | 40 +++++++++++
 .../scala/org/apache/spark/rdd/CoalescedRDD.scala  | 35 +++++-----
 .../apache/spark/rdd/CoalescedRDDBenchmark.scala   | 80 ++++++++++++++++++++++
 3 files changed, 138 insertions(+), 17 deletions(-)

diff --git a/core/benchmarks/CoalescedRDDBenchmark-results.txt 
b/core/benchmarks/CoalescedRDDBenchmark-results.txt
new file mode 100644
index 0000000..dd63b0a
--- /dev/null
+++ b/core/benchmarks/CoalescedRDDBenchmark-results.txt
@@ -0,0 +1,40 @@
+================================================================================================
+Coalesced RDD , large scale
+================================================================================================
+
+Java HotSpot(TM) 64-Bit Server VM 1.8.0_201-b09 on Windows 10 10.0
+Intel64 Family 6 Model 63 Stepping 2, GenuineIntel
+Coalesced RDD:                            Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
+------------------------------------------------------------------------------------------------------------------------
+Coalesce Num Partitions: 100 Num Hosts: 1            346            364        
  24          0.3        3458.9       1.0X
+Coalesce Num Partitions: 100 Num Hosts: 5            258            264        
   6          0.4        2579.0       1.3X
+Coalesce Num Partitions: 100 Num Hosts: 10            242            249       
    7          0.4        2415.2       1.4X
+Coalesce Num Partitions: 100 Num Hosts: 20            237            242       
    7          0.4        2371.7       1.5X
+Coalesce Num Partitions: 100 Num Hosts: 40            230            231       
    1          0.4        2299.8       1.5X
+Coalesce Num Partitions: 100 Num Hosts: 80            222            233       
   14          0.4        2223.0       1.6X
+Coalesce Num Partitions: 500 Num Hosts: 1            659            665        
   5          0.2        6590.4       0.5X
+Coalesce Num Partitions: 500 Num Hosts: 5            340            381        
  47          0.3        3395.2       1.0X
+Coalesce Num Partitions: 500 Num Hosts: 10            279            307       
   47          0.4        2788.3       1.2X
+Coalesce Num Partitions: 500 Num Hosts: 20            259            261       
    2          0.4        2591.9       1.3X
+Coalesce Num Partitions: 500 Num Hosts: 40            241            250       
   15          0.4        2406.5       1.4X
+Coalesce Num Partitions: 500 Num Hosts: 80            235            237       
    3          0.4        2349.9       1.5X
+Coalesce Num Partitions: 1000 Num Hosts: 1           1050           1053       
    4          0.1       10503.2       0.3X
+Coalesce Num Partitions: 1000 Num Hosts: 5            405            407       
    2          0.2        4049.5       0.9X
+Coalesce Num Partitions: 1000 Num Hosts: 10            320            322      
     2          0.3        3202.7       1.1X
+Coalesce Num Partitions: 1000 Num Hosts: 20            276            277      
     0          0.4        2762.3       1.3X
+Coalesce Num Partitions: 1000 Num Hosts: 40            257            260      
     5          0.4        2571.2       1.3X
+Coalesce Num Partitions: 1000 Num Hosts: 80            245            252      
    13          0.4        2448.9       1.4X
+Coalesce Num Partitions: 5000 Num Hosts: 1           3099           3145       
   55          0.0       30988.6       0.1X
+Coalesce Num Partitions: 5000 Num Hosts: 5           1037           1050       
   20          0.1       10374.4       0.3X
+Coalesce Num Partitions: 5000 Num Hosts: 10            626            633      
     8          0.2        6261.8       0.6X
+Coalesce Num Partitions: 5000 Num Hosts: 20            426            431      
     5          0.2        4258.6       0.8X
+Coalesce Num Partitions: 5000 Num Hosts: 40            328            341      
    22          0.3        3275.4       1.1X
+Coalesce Num Partitions: 5000 Num Hosts: 80            272            275      
     4          0.4        2721.4       1.3X
+Coalesce Num Partitions: 10000 Num Hosts: 1           5516           5526      
     9          0.0       55156.8       0.1X
+Coalesce Num Partitions: 10000 Num Hosts: 5           1956           1992      
    48          0.1       19560.9       0.2X
+Coalesce Num Partitions: 10000 Num Hosts: 10           1045           1057     
     18          0.1       10447.4       0.3X
+Coalesce Num Partitions: 10000 Num Hosts: 20            637            658     
     24          0.2        6373.2       0.5X
+Coalesce Num Partitions: 10000 Num Hosts: 40            431            448     
     15          0.2        4312.9       0.8X
+Coalesce Num Partitions: 10000 Num Hosts: 80            326            328     
      2          0.3        3263.4       1.1X
+
+
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
index 94e7d0b..44bc40b 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
@@ -58,7 +58,7 @@ private[spark] case class CoalescedRDDPartition(
       val parentPreferredLocations = rdd.context.getPreferredLocs(rdd, 
p.index).map(_.host)
       preferredLocation.exists(parentPreferredLocations.contains)
     }
-    if (parents.size == 0) 0.0 else (loc.toDouble / parents.size.toDouble)
+    if (parents.isEmpty) 0.0 else loc.toDouble / parents.size.toDouble
   }
 }
 
@@ -91,7 +91,7 @@ private[spark] class CoalescedRDD[T: ClassTag](
     pc.coalesce(maxPartitions, prev).zipWithIndex.map {
       case (pg, i) =>
         val ids = pg.partitions.map(_.index).toArray
-        new CoalescedRDDPartition(i, prev, ids, pg.prefLoc)
+        CoalescedRDDPartition(i, prev, ids, pg.prefLoc)
     }
   }
 
@@ -116,7 +116,7 @@ private[spark] class CoalescedRDD[T: ClassTag](
   /**
    * Returns the preferred machine for the partition. If split is of type 
CoalescedRDDPartition,
    * then the preferred machine will be one which most parent splits prefer 
too.
-   * @param partition
+   * @param partition the partition for which to retrieve the preferred 
machine, if exists
    * @return the machine most preferred by split
    */
   override def getPreferredLocations(partition: Partition): Seq[String] = {
@@ -156,9 +156,12 @@ private[spark] class CoalescedRDD[T: ClassTag](
 
 private class DefaultPartitionCoalescer(val balanceSlack: Double = 0.10)
   extends PartitionCoalescer {
-  def compare(o1: PartitionGroup, o2: PartitionGroup): Boolean = 
o1.numPartitions < o2.numPartitions
-  def compare(o1: Option[PartitionGroup], o2: Option[PartitionGroup]): Boolean 
=
-    if (o1 == None) false else if (o2 == None) true else compare(o1.get, 
o2.get)
+
+  implicit object partitionGroupOrdering extends Ordering[PartitionGroup] {
+    override def compare(o1: PartitionGroup, o2: PartitionGroup): Int =
+      java.lang.Integer.compare(o1.numPartitions, o2.numPartitions)
+  }
+
 
   val rnd = new scala.util.Random(7919) // keep this class deterministic
 
@@ -178,7 +181,7 @@ private class DefaultPartitionCoalescer(val balanceSlack: 
Double = 0.10)
     prev.context.getPreferredLocs(prev, part.index).map(tl => tl.host)
   }
 
-  class PartitionLocations(prev: RDD[_]) {
+  private class PartitionLocations(prev: RDD[_]) {
 
     // contains all the partitions from the previous RDD that don't have 
preferred locations
     val partsWithoutLocs = ArrayBuffer[Partition]()
@@ -213,15 +216,14 @@ private class DefaultPartitionCoalescer(val balanceSlack: 
Double = 0.10)
   }
 
   /**
-   * Sorts and gets the least element of the list associated with key in 
groupHash
+   * Gets the least element of the list associated with key in groupHash
    * The returned PartitionGroup is the least loaded of all groups that 
represent the machine "key"
    *
    * @param key string representing a partitioned group on preferred machine 
key
    * @return Option of [[PartitionGroup]] that has least elements for key
    */
-  def getLeastGroupHash(key: String): Option[PartitionGroup] = {
-    groupHash.get(key).map(_.sortWith(compare).head)
-  }
+  def getLeastGroupHash(key: String): Option[PartitionGroup] =
+    groupHash.get(key).filter(_.nonEmpty).map(_.min)
 
   def addPartToPGroup(part: Partition, pgroup: PartitionGroup): Boolean = {
     if (!initialHash.contains(part)) {
@@ -236,12 +238,12 @@ private class DefaultPartitionCoalescer(val balanceSlack: 
Double = 0.10)
    * is assigned a preferredLocation. This uses coupon collector to estimate 
how many
    * preferredLocations it must rotate through until it has seen most of the 
preferred
    * locations (2 * n log(n))
-   * @param targetLen
+   * @param targetLen The number of desired partition groups
    */
   def setupGroups(targetLen: Int, partitionLocs: PartitionLocations) {
     // deal with empty case, just create targetLen partition groups with no 
preferred location
     if (partitionLocs.partsWithLocs.isEmpty) {
-      (1 to targetLen).foreach(x => groupArr += new PartitionGroup())
+      (1 to targetLen).foreach(_ => groupArr += new PartitionGroup())
       return
     }
 
@@ -297,9 +299,8 @@ private class DefaultPartitionCoalescer(val balanceSlack: 
Double = 0.10)
       partitionLocs: PartitionLocations): PartitionGroup = {
     val slack = (balanceSlack * prev.partitions.length).toInt
     // least loaded pref locs
-    val pref = currPrefLocs(p, 
prev).map(getLeastGroupHash(_)).sortWith(compare)
-    val prefPart = if (pref == Nil) None else pref.head
-
+    val pref = currPrefLocs(p, prev).flatMap(getLeastGroupHash)
+    val prefPart = if (pref.isEmpty) None else Some(pref.min)
     val r1 = rnd.nextInt(groupArr.size)
     val r2 = rnd.nextInt(groupArr.size)
     val minPowerOfTwo = {
@@ -351,7 +352,7 @@ private class DefaultPartitionCoalescer(val balanceSlack: 
Double = 0.10)
       val partIter = partitionLocs.partsWithLocs.iterator
       groupArr.filter(pg => pg.numPartitions == 0).foreach { pg =>
         while (partIter.hasNext && pg.numPartitions == 0) {
-          var (nxt_replica, nxt_part) = partIter.next()
+          var (_, nxt_part) = partIter.next()
           if (!initialHash.contains(nxt_part)) {
             pg.partitions += nxt_part
             initialHash += nxt_part
diff --git 
a/core/src/test/scala/org/apache/spark/rdd/CoalescedRDDBenchmark.scala 
b/core/src/test/scala/org/apache/spark/rdd/CoalescedRDDBenchmark.scala
new file mode 100644
index 0000000..42b3070
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/rdd/CoalescedRDDBenchmark.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd
+
+import scala.collection.immutable
+
+import org.apache.spark.SparkContext
+import org.apache.spark.benchmark.{Benchmark, BenchmarkBase}
+
+/**
+ * Benchmark for CoalescedRDD.
+ * Measures rdd.coalesce performance under various combinations of
+ * coalesced partitions and preferred hosts
+ * To run this benchmark:
+ * {{{
+ *   1. without sbt:
+ *      bin/spark-submit --class <this class> --jars <spark core test jar>
+ *   2. build/sbt "core/test:runMain <this class>"
+ *   3. generate result:
+ *      SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "core/test:runMain <this 
class>"
+ *      Results will be written to "benchmarks/CoalescedRDD-results.txt".
+ * }}}
+ * */
+object CoalescedRDDBenchmark extends BenchmarkBase {
+  val seed = 0x1337
+  val sc = new SparkContext(master = "local[4]", appName = "test")
+
+  private def coalescedRDD(numIters: Int): Unit = {
+    val numBlocks = 100000
+    val benchmark = new Benchmark("Coalesced RDD", numBlocks, output = output)
+    for (numPartitions <- Seq(100, 500, 1000, 5000, 10000)) {
+      for (numHosts <- Seq(1, 5, 10, 20, 40, 80)) {
+
+        import collection.mutable
+        val hosts = mutable.ArrayBuffer[String]()
+        (1 to numHosts).foreach(hosts += "m" + _)
+        hosts.length
+        val rnd = scala.util.Random
+        rnd.setSeed(seed)
+        val blocks: immutable.Seq[(Int, Seq[String])] = (1 to numBlocks).map { 
i =>
+          (i, hosts(rnd.nextInt(hosts.size)) :: Nil)
+        }
+
+        benchmark.addCase(
+          s"Coalesce Num Partitions: $numPartitions Num Hosts: $numHosts",
+          numIters) { _ =>
+          performCoalesce(blocks, numPartitions)
+        }
+      }
+    }
+
+    benchmark.run()
+  }
+
+  private def performCoalesce(blocks: immutable.Seq[(Int, Seq[String])], 
numPartitions: Int) {
+    sc.makeRDD(blocks).coalesce(numPartitions).partitions
+  }
+
+  override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
+    val numIters = 3
+    runBenchmark("Coalesced RDD , large scale") {
+      coalescedRDD(numIters)
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to