Evan, Joseph

Thank you for valuable suggestions. It would be great to improve TreeAggregate 
(if possible).

Making less updates would certainly make sense, though that will mean using 
batch gradient such as LBFGS. It seems as today it is the only viable option in 
Spark.

I will also take a look into how to zip the data sent as update. Do you know 
any options except going from double to single precision (or less) ?

Best regards, Alexander

From: Evan Sparks [mailto:evan.spa...@gmail.com]
Sent: Saturday, October 17, 2015 2:24 PM
To: Joseph Bradley
Cc: Ulanov, Alexander; dev@spark.apache.org
Subject: Re: Gradient Descent with large model size

Yes, remember that your bandwidth is the maximum number of bytes per second 
that can be shipped to the driver. So if you've got 5 blocks that size, then it 
looks like you're basically saturating the network.

Aggregation trees help for many partitions/nodes and butterfly mixing can help 
use all of the network resources. I have seen implementations of butterfly 
mixing in spark but don't know if we've got one in mainline. Zhao and Canny's 
work [1] details this approach in the context of model fitting.

At any rate, for this type of ANN work with huge models in *any* distributed 
setting, you're going to need to get faster networking (most production 
deployments I know of either have 10 gigabit Ethernet or 40 gigabit infiniband 
links), or figure out a way to decrease frequency or density of updates. Both 
would be even better.

[1] http://www.cs.berkeley.edu/~jfc/papers/13/butterflymixing.pdf

On Oct 17, 2015, at 12:47 PM, Joseph Bradley 
<jos...@databricks.com<mailto:jos...@databricks.com>> wrote:
The decrease in running time from N=6 to N=7 makes some sense to me; that 
should be when tree aggregation kicks in.  I'd call it an improvement to run in 
the same ~13sec increasing from N=6 to N=9.

"Does this mean that for 5 nodes with treeaggreate of depth 1 it will take 
5*3.1~15.5 seconds?"
--> I would guess so since all of that will be aggregated on the driver, but I 
don't know enough about Spark's shuffling/networking to say for sure.  Others 
may be able to help more.

Your numbers do make me wonder if we should examine the structure of the tree 
aggregation more carefully and see if we can improve it.  
https://issues.apache.org/jira/browse/SPARK-11168

Joseph

On Thu, Oct 15, 2015 at 7:01 PM, Ulanov, Alexander 
<alexander.ula...@hpe.com<mailto:alexander.ula...@hpe.com>> wrote:
Hi Joseph,

There seems to be no improvement if I run it with more partitions or bigger 
depth:
N = 6 Avg time: 13.491579108666668
N = 7 Avg time: 8.929480508
N = 8 Avg time: 14.507123471999998
N= 9 Avg time: 13.854871645333333

Depth = 3
N=2 Avg time: 8.853895346333333
N=5 Avg time: 15.991574924666667

I also measured the bandwidth of my network with iperf. It shows 247Mbit/s. So 
the transfer of 12M array of double message should take 64 * 12M/247M~3.1s. 
Does this mean that for 5 nodes with treeaggreate of depth 1 it will take 
5*3.1~15.5 seconds?

Best regards, Alexander
From: Joseph Bradley 
[mailto:jos...@databricks.com<mailto:jos...@databricks.com>]
Sent: Wednesday, October 14, 2015 11:35 PM
To: Ulanov, Alexander
Cc: dev@spark.apache.org<mailto:dev@spark.apache.org>
Subject: Re: Gradient Descent with large model size

For those numbers of partitions, I don't think you'll actually use tree 
aggregation.  The number of partitions needs to be over a certain threshold (>= 
7) before treeAggregate really operates on a tree structure:
https://github.com/apache/spark/blob/9808052b5adfed7dafd6c1b3971b998e45b2799a/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L1100

Do you see a slower increase in running time with more partitions?  For 5 
partitions, do you find things improve if you tell treeAggregate to use depth > 
2?

Joseph

On Wed, Oct 14, 2015 at 1:18 PM, Ulanov, Alexander 
<alexander.ula...@hpe.com<mailto:alexander.ula...@hpe.com>> wrote:
Dear Spark developers,

I have noticed that Gradient Descent is Spark MLlib takes long time if the 
model is large. It is implemented with TreeAggregate. I’ve extracted the code 
from GradientDescent.scala to perform the benchmark. It allocates the Array of 
a given size and the aggregates it:

val dataSize = 12000000
val n = 5
val maxIterations = 3
val rdd = sc.parallelize(0 until n, n).cache()
rdd.count()
var avgTime = 0.0
for (i <- 1 to maxIterations) {
  val start = System.nanoTime()
  val result = rdd.treeAggregate((new Array[Double](dataSize), 0.0, 0L))(
        seqOp = (c, v) => {
          // c: (grad, loss, count)
          val l = 0.0
          (c._1, c._2 + l, c._3 + 1)
        },
        combOp = (c1, c2) => {
          // c: (grad, loss, count)
          (c1._1, c1._2 + c2._2, c1._3 + c2._3)
        })
  avgTime += (System.nanoTime() - start) / 1e9
  assert(result._1.length == dataSize)
}
println("Avg time: " + avgTime / maxIterations)

If I run on my cluster of 1 master and 5 workers, I get the following results 
(given the array size = 12M):
n = 1: Avg time: 4.555709667333333
n = 2 Avg time: 7.059724584666667
n = 3 Avg time: 9.937117377666667
n = 4 Avg time: 12.687526233
n = 5 Avg time: 12.939526129666667

Could you explain why the time becomes so big? The data transfer of 12M array 
of double should take ~ 1 second in 1Gbit network. There might be other 
overheads, however not that big as I observe.
Best regards, Alexander


Reply via email to