Hi Alexander, Joseph, Evan,

I just wanted to weigh in an empirical result that we've had on a
standalone cluster with 16 nodes and 256 cores.

Typically we run optimization tasks with 256 partitions for 1
partition per core, and find that performance worsens with more
partitions than physical cores in communication operations, which
makes sense; the computational work has to be very high to justify so
many partitions.

However, with this setup, the default of numLevels = 2 in MLlib
methods using treeAggregate is generally a poor choice for large
datasets; it has been empirically far better in our tests to use
numLevels = log_2(16). The difference in clock time per iteration for
iterative optimization jobs can be huge; it takes 1.5--1.6x *less*
time to use more levels in the tree. I never see a difference running
running test suites on a single node for building, but on a large job
across the cluster it's very noticeable.

If there's to be any modifications of treeAggregate, I would recommend
some heuristics that uses numLevels = log_2(numNodes) or something
similar, or have the numLevels be specifiable in the MLlib APIs
instead of defaulting to 2.

Mike


On 10/19/15, Ulanov, Alexander <alexander.ula...@hpe.com> wrote:
> Evan, Joseph
>
> Thank you for valuable suggestions. It would be great to improve
> TreeAggregate (if possible).
>
> Making less updates would certainly make sense, though that will mean using
> batch gradient such as LBFGS. It seems as today it is the only viable option
> in Spark.
>
> I will also take a look into how to zip the data sent as update. Do you know
> any options except going from double to single precision (or less) ?
>
> Best regards, Alexander
>
> From: Evan Sparks [mailto:evan.spa...@gmail.com]
> Sent: Saturday, October 17, 2015 2:24 PM
> To: Joseph Bradley
> Cc: Ulanov, Alexander; dev@spark.apache.org
> Subject: Re: Gradient Descent with large model size
>
> Yes, remember that your bandwidth is the maximum number of bytes per second
> that can be shipped to the driver. So if you've got 5 blocks that size, then
> it looks like you're basically saturating the network.
>
> Aggregation trees help for many partitions/nodes and butterfly mixing can
> help use all of the network resources. I have seen implementations of
> butterfly mixing in spark but don't know if we've got one in mainline. Zhao
> and Canny's work [1] details this approach in the context of model fitting.
>
> At any rate, for this type of ANN work with huge models in *any* distributed
> setting, you're going to need to get faster networking (most production
> deployments I know of either have 10 gigabit Ethernet or 40 gigabit
> infiniband links), or figure out a way to decrease frequency or density of
> updates. Both would be even better.
>
> [1] http://www.cs.berkeley.edu/~jfc/papers/13/butterflymixing.pdf
>
> On Oct 17, 2015, at 12:47 PM, Joseph Bradley
> <jos...@databricks.com<mailto:jos...@databricks.com>> wrote:
> The decrease in running time from N=6 to N=7 makes some sense to me; that
> should be when tree aggregation kicks in.  I'd call it an improvement to run
> in the same ~13sec increasing from N=6 to N=9.
>
> "Does this mean that for 5 nodes with treeaggreate of depth 1 it will take
> 5*3.1~15.5 seconds?"
> --> I would guess so since all of that will be aggregated on the driver, but
> I don't know enough about Spark's shuffling/networking to say for sure.
> Others may be able to help more.
>
> Your numbers do make me wonder if we should examine the structure of the
> tree aggregation more carefully and see if we can improve it.
> https://issues.apache.org/jira/browse/SPARK-11168
>
> Joseph
>
> On Thu, Oct 15, 2015 at 7:01 PM, Ulanov, Alexander
> <alexander.ula...@hpe.com<mailto:alexander.ula...@hpe.com>> wrote:
> Hi Joseph,
>
> There seems to be no improvement if I run it with more partitions or bigger
> depth:
> N = 6 Avg time: 13.491579108666668
> N = 7 Avg time: 8.929480508
> N = 8 Avg time: 14.507123471999998
> N= 9 Avg time: 13.854871645333333
>
> Depth = 3
> N=2 Avg time: 8.853895346333333
> N=5 Avg time: 15.991574924666667
>
> I also measured the bandwidth of my network with iperf. It shows 247Mbit/s.
> So the transfer of 12M array of double message should take 64 *
> 12M/247M~3.1s. Does this mean that for 5 nodes with treeaggreate of depth 1
> it will take 5*3.1~15.5 seconds?
>
> Best regards, Alexander
> From: Joseph Bradley
> [mailto:jos...@databricks.com<mailto:jos...@databricks.com>]
> Sent: Wednesday, October 14, 2015 11:35 PM
> To: Ulanov, Alexander
> Cc: dev@spark.apache.org<mailto:dev@spark.apache.org>
> Subject: Re: Gradient Descent with large model size
>
> For those numbers of partitions, I don't think you'll actually use tree
> aggregation.  The number of partitions needs to be over a certain threshold
> (>= 7) before treeAggregate really operates on a tree structure:
> https://github.com/apache/spark/blob/9808052b5adfed7dafd6c1b3971b998e45b2799a/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L1100
>
> Do you see a slower increase in running time with more partitions?  For 5
> partitions, do you find things improve if you tell treeAggregate to use
> depth > 2?
>
> Joseph
>
> On Wed, Oct 14, 2015 at 1:18 PM, Ulanov, Alexander
> <alexander.ula...@hpe.com<mailto:alexander.ula...@hpe.com>> wrote:
> Dear Spark developers,
>
> I have noticed that Gradient Descent is Spark MLlib takes long time if the
> model is large. It is implemented with TreeAggregate. I’ve extracted the
> code from GradientDescent.scala to perform the benchmark. It allocates the
> Array of a given size and the aggregates it:
>
> val dataSize = 12000000
> val n = 5
> val maxIterations = 3
> val rdd = sc.parallelize(0 until n, n).cache()
> rdd.count()
> var avgTime = 0.0
> for (i <- 1 to maxIterations) {
>   val start = System.nanoTime()
>   val result = rdd.treeAggregate((new Array[Double](dataSize), 0.0, 0L))(
>         seqOp = (c, v) => {
>           // c: (grad, loss, count)
>           val l = 0.0
>           (c._1, c._2 + l, c._3 + 1)
>         },
>         combOp = (c1, c2) => {
>           // c: (grad, loss, count)
>           (c1._1, c1._2 + c2._2, c1._3 + c2._3)
>         })
>   avgTime += (System.nanoTime() - start) / 1e9
>   assert(result._1.length == dataSize)
> }
> println("Avg time: " + avgTime / maxIterations)
>
> If I run on my cluster of 1 master and 5 workers, I get the following
> results (given the array size = 12M):
> n = 1: Avg time: 4.555709667333333
> n = 2 Avg time: 7.059724584666667
> n = 3 Avg time: 9.937117377666667
> n = 4 Avg time: 12.687526233
> n = 5 Avg time: 12.939526129666667
>
> Could you explain why the time becomes so big? The data transfer of 12M
> array of double should take ~ 1 second in 1Gbit network. There might be
> other overheads, however not that big as I observe.
> Best regards, Alexander
>
>
>


-- 
Thanks,
Mike

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org

Reply via email to