Repository: spark
Updated Branches:
  refs/heads/master 96ba04bbf -> 20424dad3


[SPARK-2174][MLLIB] treeReduce and treeAggregate

In `reduce` and `aggregate`, the driver node spends linear time on the number 
of partitions. It becomes a bottleneck when there are many partitions and the 
data from each partition is big.

SPARK-1485 (#506) tracks the progress of implementing AllReduce on Spark. I did 
several implementations including butterfly, reduce + broadcast, and treeReduce 
+ broadcast. treeReduce + BT broadcast seems to be right way to go for Spark. 
Using binary tree may introduce some overhead in communication, because the 
driver still need to coordinate on data shuffling. In my experiments, n -> 
sqrt(n) -> 1 gives the best performance in general, which is why I set "depth = 
2" in MLlib algorithms. But it certainly needs more testing.

I left `treeReduce` and `treeAggregate` public for easy testing. Some numbers 
from a test on 32-node m3.2xlarge cluster.

code:

~~~
import breeze.linalg._
import org.apache.log4j._

Logger.getRootLogger.setLevel(Level.OFF)

for (n <- Seq(1, 10, 100, 1000, 10000, 100000, 1000000)) {
  val vv = sc.parallelize(0 until 1024, 1024).map(i => 
DenseVector.zeros[Double](n))
  var start = System.nanoTime(); vv.treeReduce(_ + _, 2); 
println((System.nanoTime() - start) / 1e9)
  start = System.nanoTime(); vv.reduce(_ + _); println((System.nanoTime() - 
start) / 1e9)
}
~~~

out:

| n | treeReduce(,2) | reduce |
|---|---------------------|-----------|
| 10 | 0.215538731 | 0.204206899 |
| 100 | 0.278405907 | 0.205732582 |
| 1000 | 0.208972182 | 0.214298272 |
| 10000 | 0.194792071 | 0.349353687 |
| 100000 | 0.347683285 | 6.086671892 |
| 1000000 | 2.589350682 | 66.572906702 |

CC: @pwendell

This is clearly more scalable than the default implementation. My question is 
whether we should use this implementation in `reduce` and `aggregate` or put 
them as separate methods. The concern is that users may use `reduce` and 
`aggregate` as collect, where having multiple stages doesn't reduce the data 
size. However, in this case, `collect` is more appropriate.

Author: Xiangrui Meng <m...@databricks.com>

Closes #1110 from mengxr/tree and squashes the following commits:

c6cd267 [Xiangrui Meng] make depth default to 2
b04b96a [Xiangrui Meng] address comments
9bcc5d3 [Xiangrui Meng] add depth for readability
7495681 [Xiangrui Meng] fix compile error
142a857 [Xiangrui Meng] merge master
d58a087 [Xiangrui Meng] move treeReduce and treeAggregate to mllib
8a2a59c [Xiangrui Meng] Merge branch 'master' into tree
be6a88a [Xiangrui Meng] use treeAggregate in mllib
0f94490 [Xiangrui Meng] add docs
eb71c33 [Xiangrui Meng] add treeReduce
fe42a5e [Xiangrui Meng] add treeAggregate


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/20424dad
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/20424dad
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/20424dad

Branch: refs/heads/master
Commit: 20424dad30e6c89ba42b07eb329070bdcb3494cb
Parents: 96ba04b
Author: Xiangrui Meng <m...@databricks.com>
Authored: Tue Jul 29 01:16:41 2014 -0700
Committer: Reynold Xin <r...@apache.org>
Committed: Tue Jul 29 01:16:41 2014 -0700

----------------------------------------------------------------------
 .../mllib/linalg/distributed/RowMatrix.scala    | 23 +++----
 .../mllib/optimization/GradientDescent.scala    |  3 +-
 .../apache/spark/mllib/optimization/LBFGS.scala |  3 +-
 .../apache/spark/mllib/rdd/RDDFunctions.scala   | 66 ++++++++++++++++++++
 .../spark/mllib/rdd/RDDFunctionsSuite.scala     | 18 ++++++
 5 files changed, 98 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/20424dad/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
 
b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
index 8c2b044..58c1322 100644
--- 
a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
@@ -28,6 +28,7 @@ import org.apache.spark.annotation.Experimental
 import org.apache.spark.mllib.linalg._
 import org.apache.spark.rdd.RDD
 import org.apache.spark.Logging
+import org.apache.spark.mllib.rdd.RDDFunctions._
 import org.apache.spark.mllib.stat.{MultivariateOnlineSummarizer, 
MultivariateStatisticalSummary}
 
 /**
@@ -79,7 +80,7 @@ class RowMatrix(
   private[mllib] def multiplyGramianMatrixBy(v: BDV[Double]): BDV[Double] = {
     val n = numCols().toInt
     val vbr = rows.context.broadcast(v)
-    rows.aggregate(BDV.zeros[Double](n))(
+    rows.treeAggregate(BDV.zeros[Double](n))(
       seqOp = (U, r) => {
         val rBrz = r.toBreeze
         val a = rBrz.dot(vbr.value)
@@ -91,9 +92,7 @@ class RowMatrix(
             s"Do not support vector operation from type 
${rBrz.getClass.getName}.")
         }
         U
-      },
-      combOp = (U1, U2) => U1 += U2
-    )
+      }, combOp = (U1, U2) => U1 += U2)
   }
 
   /**
@@ -104,13 +103,11 @@ class RowMatrix(
     val nt: Int = n * (n + 1) / 2
 
     // Compute the upper triangular part of the gram matrix.
-    val GU = rows.aggregate(new BDV[Double](new Array[Double](nt)))(
+    val GU = rows.treeAggregate(new BDV[Double](new Array[Double](nt)))(
       seqOp = (U, v) => {
         RowMatrix.dspr(1.0, v, U.data)
         U
-      },
-      combOp = (U1, U2) => U1 += U2
-    )
+      }, combOp = (U1, U2) => U1 += U2)
 
     RowMatrix.triuToFull(n, GU.data)
   }
@@ -290,9 +287,10 @@ class RowMatrix(
         s"We need at least $mem bytes of memory.")
     }
 
-    val (m, mean) = rows.aggregate[(Long, BDV[Double])]((0L, 
BDV.zeros[Double](n)))(
+    val (m, mean) = rows.treeAggregate[(Long, BDV[Double])]((0L, 
BDV.zeros[Double](n)))(
       seqOp = (s: (Long, BDV[Double]), v: Vector) => (s._1 + 1L, s._2 += 
v.toBreeze),
-      combOp = (s1: (Long, BDV[Double]), s2: (Long, BDV[Double])) => (s1._1 + 
s2._1, s1._2 += s2._2)
+      combOp = (s1: (Long, BDV[Double]), s2: (Long, BDV[Double])) =>
+        (s1._1 + s2._1, s1._2 += s2._2)
     )
 
     updateNumRows(m)
@@ -353,10 +351,9 @@ class RowMatrix(
    * Computes column-wise summary statistics.
    */
   def computeColumnSummaryStatistics(): MultivariateStatisticalSummary = {
-    val summary = rows.aggregate[MultivariateOnlineSummarizer](new 
MultivariateOnlineSummarizer)(
+    val summary = rows.treeAggregate(new MultivariateOnlineSummarizer)(
       (aggregator, data) => aggregator.add(data),
-      (aggregator1, aggregator2) => aggregator1.merge(aggregator2)
-    )
+      (aggregator1, aggregator2) => aggregator1.merge(aggregator2))
     updateNumRows(summary.count)
     summary
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/20424dad/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala
 
b/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala
index 9fd760b..356aa94 100644
--- 
a/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala
@@ -25,6 +25,7 @@ import org.apache.spark.annotation.{Experimental, 
DeveloperApi}
 import org.apache.spark.Logging
 import org.apache.spark.rdd.RDD
 import org.apache.spark.mllib.linalg.{Vectors, Vector}
+import org.apache.spark.mllib.rdd.RDDFunctions._
 
 /**
  * Class used to solve an optimization problem using Gradient Descent.
@@ -177,7 +178,7 @@ object GradientDescent extends Logging {
       // Sample a subset (fraction miniBatchFraction) of the total data
       // compute and sum up the subgradients on this subset (this is one 
map-reduce)
       val (gradientSum, lossSum) = data.sample(false, miniBatchFraction, 42 + 
i)
-        .aggregate((BDV.zeros[Double](n), 0.0))(
+        .treeAggregate((BDV.zeros[Double](n), 0.0))(
           seqOp = (c, v) => (c, v) match { case ((grad, loss), (label, 
features)) =>
             val l = gradient.compute(features, label, bcWeights.value, 
Vectors.fromBreeze(grad))
             (grad, loss + l)

http://git-wip-us.apache.org/repos/asf/spark/blob/20424dad/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala
index 179cd4a..26a2b62 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala
@@ -26,6 +26,7 @@ import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.Logging
 import org.apache.spark.rdd.RDD
 import org.apache.spark.mllib.linalg.{Vectors, Vector}
+import org.apache.spark.mllib.rdd.RDDFunctions._
 
 /**
  * :: DeveloperApi ::
@@ -199,7 +200,7 @@ object LBFGS extends Logging {
       val n = weights.length
       val bcWeights = data.context.broadcast(weights)
 
-      val (gradientSum, lossSum) = data.aggregate((BDV.zeros[Double](n), 0.0))(
+      val (gradientSum, lossSum) = data.treeAggregate((BDV.zeros[Double](n), 
0.0))(
           seqOp = (c, v) => (c, v) match { case ((grad, loss), (label, 
features)) =>
             val l = localGradient.compute(
               features, label, Vectors.fromBreeze(bcWeights.value), 
Vectors.fromBreeze(grad))

http://git-wip-us.apache.org/repos/asf/spark/blob/20424dad/mllib/src/main/scala/org/apache/spark/mllib/rdd/RDDFunctions.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/rdd/RDDFunctions.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/rdd/RDDFunctions.scala
index 365b5e7..b5e403b 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/rdd/RDDFunctions.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/rdd/RDDFunctions.scala
@@ -20,7 +20,10 @@ package org.apache.spark.mllib.rdd
 import scala.language.implicitConversions
 import scala.reflect.ClassTag
 
+import org.apache.spark.HashPartitioner
+import org.apache.spark.SparkContext._
 import org.apache.spark.rdd.RDD
+import org.apache.spark.util.Utils
 
 /**
  * Machine learning specific RDD functions.
@@ -44,6 +47,69 @@ class RDDFunctions[T: ClassTag](self: RDD[T]) {
       new SlidingRDD[T](self, windowSize)
     }
   }
+
+  /**
+   * Reduces the elements of this RDD in a multi-level tree pattern.
+   *
+   * @param depth suggested depth of the tree (default: 2)
+   * @see [[org.apache.spark.rdd.RDD#reduce]]
+   */
+  def treeReduce(f: (T, T) => T, depth: Int = 2): T = {
+    require(depth >= 1, s"Depth must be greater than or equal to 1 but got 
$depth.")
+    val cleanF = self.context.clean(f)
+    val reducePartition: Iterator[T] => Option[T] = iter => {
+      if (iter.hasNext) {
+        Some(iter.reduceLeft(cleanF))
+      } else {
+        None
+      }
+    }
+    val partiallyReduced = self.mapPartitions(it => 
Iterator(reducePartition(it)))
+    val op: (Option[T], Option[T]) => Option[T] = (c, x) => {
+      if (c.isDefined && x.isDefined) {
+        Some(cleanF(c.get, x.get))
+      } else if (c.isDefined) {
+        c
+      } else if (x.isDefined) {
+        x
+      } else {
+        None
+      }
+    }
+    RDDFunctions.fromRDD(partiallyReduced).treeAggregate(Option.empty[T])(op, 
op, depth)
+      .getOrElse(throw new UnsupportedOperationException("empty collection"))
+  }
+
+  /**
+   * Aggregates the elements of this RDD in a multi-level tree pattern.
+   *
+   * @param depth suggested depth of the tree (default: 2)
+   * @see [[org.apache.spark.rdd.RDD#aggregate]]
+   */
+  def treeAggregate[U: ClassTag](zeroValue: U)(
+      seqOp: (U, T) => U,
+      combOp: (U, U) => U,
+      depth: Int = 2): U = {
+    require(depth >= 1, s"Depth must be greater than or equal to 1 but got 
$depth.")
+    if (self.partitions.size == 0) {
+      return Utils.clone(zeroValue, 
self.context.env.closureSerializer.newInstance())
+    }
+    val cleanSeqOp = self.context.clean(seqOp)
+    val cleanCombOp = self.context.clean(combOp)
+    val aggregatePartition = (it: Iterator[T]) => 
it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)
+    var partiallyAggregated = self.mapPartitions(it => 
Iterator(aggregatePartition(it)))
+    var numPartitions = partiallyAggregated.partitions.size
+    val scale = math.max(math.ceil(math.pow(numPartitions, 1.0 / 
depth)).toInt, 2)
+    // If creating an extra level doesn't help reduce the wall-clock time, we 
stop tree aggregation.
+    while (numPartitions > scale + numPartitions / scale) {
+      numPartitions /= scale
+      val curNumPartitions = numPartitions
+      partiallyAggregated = partiallyAggregated.mapPartitionsWithIndex { (i, 
iter) =>
+        iter.map((i % curNumPartitions, _))
+      }.reduceByKey(new HashPartitioner(curNumPartitions), cleanCombOp).values
+    }
+    partiallyAggregated.reduce(cleanCombOp)
+  }
 }
 
 private[mllib]

http://git-wip-us.apache.org/repos/asf/spark/blob/20424dad/mllib/src/test/scala/org/apache/spark/mllib/rdd/RDDFunctionsSuite.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/test/scala/org/apache/spark/mllib/rdd/RDDFunctionsSuite.scala 
b/mllib/src/test/scala/org/apache/spark/mllib/rdd/RDDFunctionsSuite.scala
index 3f3b10d..27a19f7 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/rdd/RDDFunctionsSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/rdd/RDDFunctionsSuite.scala
@@ -46,4 +46,22 @@ class RDDFunctionsSuite extends FunSuite with 
LocalSparkContext {
     val expected = data.flatMap(x => x).sliding(3).toList
     assert(sliding.collect().toList === expected)
   }
+
+  test("treeAggregate") {
+    val rdd = sc.makeRDD(-1000 until 1000, 10)
+    def seqOp = (c: Long, x: Int) => c + x
+    def combOp = (c1: Long, c2: Long) => c1 + c2
+    for (depth <- 1 until 10) {
+      val sum = rdd.treeAggregate(0L)(seqOp, combOp, depth)
+      assert(sum === -1000L)
+    }
+  }
+
+  test("treeReduce") {
+    val rdd = sc.makeRDD(-1000 until 1000, 10)
+    for (depth <- 1 until 10) {
+      val sum = rdd.treeReduce(_ + _, depth)
+      assert(sum === -1000)
+    }
+  }
 }

Reply via email to