Hi Alexander, Joseph, Evan, I just wanted to weigh in an empirical result that we've had on a standalone cluster with 16 nodes and 256 cores.
Typically we run optimization tasks with 256 partitions for 1 partition per core, and find that performance worsens with more partitions than physical cores in communication operations, which makes sense; the computational work has to be very high to justify so many partitions. However, with this setup, the default of numLevels = 2 in MLlib methods using treeAggregate is generally a poor choice for large datasets; it has been empirically far better in our tests to use numLevels = log_2(16). The difference in clock time per iteration for iterative optimization jobs can be huge; it takes 1.5--1.6x *less* time to use more levels in the tree. I never see a difference running running test suites on a single node for building, but on a large job across the cluster it's very noticeable. If there's to be any modifications of treeAggregate, I would recommend some heuristics that uses numLevels = log_2(numNodes) or something similar, or have the numLevels be specifiable in the MLlib APIs instead of defaulting to 2. Mike On 10/19/15, Ulanov, Alexander <alexander.ula...@hpe.com> wrote: > Evan, Joseph > > Thank you for valuable suggestions. It would be great to improve > TreeAggregate (if possible). > > Making less updates would certainly make sense, though that will mean using > batch gradient such as LBFGS. It seems as today it is the only viable option > in Spark. > > I will also take a look into how to zip the data sent as update. Do you know > any options except going from double to single precision (or less) ? > > Best regards, Alexander > > From: Evan Sparks [mailto:evan.spa...@gmail.com] > Sent: Saturday, October 17, 2015 2:24 PM > To: Joseph Bradley > Cc: Ulanov, Alexander; dev@spark.apache.org > Subject: Re: Gradient Descent with large model size > > Yes, remember that your bandwidth is the maximum number of bytes per second > that can be shipped to the driver. So if you've got 5 blocks that size, then > it looks like you're basically saturating the network. > > Aggregation trees help for many partitions/nodes and butterfly mixing can > help use all of the network resources. I have seen implementations of > butterfly mixing in spark but don't know if we've got one in mainline. Zhao > and Canny's work [1] details this approach in the context of model fitting. > > At any rate, for this type of ANN work with huge models in *any* distributed > setting, you're going to need to get faster networking (most production > deployments I know of either have 10 gigabit Ethernet or 40 gigabit > infiniband links), or figure out a way to decrease frequency or density of > updates. Both would be even better. > > [1] http://www.cs.berkeley.edu/~jfc/papers/13/butterflymixing.pdf > > On Oct 17, 2015, at 12:47 PM, Joseph Bradley > <jos...@databricks.com<mailto:jos...@databricks.com>> wrote: > The decrease in running time from N=6 to N=7 makes some sense to me; that > should be when tree aggregation kicks in. I'd call it an improvement to run > in the same ~13sec increasing from N=6 to N=9. > > "Does this mean that for 5 nodes with treeaggreate of depth 1 it will take > 5*3.1~15.5 seconds?" > --> I would guess so since all of that will be aggregated on the driver, but > I don't know enough about Spark's shuffling/networking to say for sure. > Others may be able to help more. > > Your numbers do make me wonder if we should examine the structure of the > tree aggregation more carefully and see if we can improve it. > https://issues.apache.org/jira/browse/SPARK-11168 > > Joseph > > On Thu, Oct 15, 2015 at 7:01 PM, Ulanov, Alexander > <alexander.ula...@hpe.com<mailto:alexander.ula...@hpe.com>> wrote: > Hi Joseph, > > There seems to be no improvement if I run it with more partitions or bigger > depth: > N = 6 Avg time: 13.491579108666668 > N = 7 Avg time: 8.929480508 > N = 8 Avg time: 14.507123471999998 > N= 9 Avg time: 13.854871645333333 > > Depth = 3 > N=2 Avg time: 8.853895346333333 > N=5 Avg time: 15.991574924666667 > > I also measured the bandwidth of my network with iperf. It shows 247Mbit/s. > So the transfer of 12M array of double message should take 64 * > 12M/247M~3.1s. Does this mean that for 5 nodes with treeaggreate of depth 1 > it will take 5*3.1~15.5 seconds? > > Best regards, Alexander > From: Joseph Bradley > [mailto:jos...@databricks.com<mailto:jos...@databricks.com>] > Sent: Wednesday, October 14, 2015 11:35 PM > To: Ulanov, Alexander > Cc: dev@spark.apache.org<mailto:dev@spark.apache.org> > Subject: Re: Gradient Descent with large model size > > For those numbers of partitions, I don't think you'll actually use tree > aggregation. The number of partitions needs to be over a certain threshold > (>= 7) before treeAggregate really operates on a tree structure: > https://github.com/apache/spark/blob/9808052b5adfed7dafd6c1b3971b998e45b2799a/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L1100 > > Do you see a slower increase in running time with more partitions? For 5 > partitions, do you find things improve if you tell treeAggregate to use > depth > 2? > > Joseph > > On Wed, Oct 14, 2015 at 1:18 PM, Ulanov, Alexander > <alexander.ula...@hpe.com<mailto:alexander.ula...@hpe.com>> wrote: > Dear Spark developers, > > I have noticed that Gradient Descent is Spark MLlib takes long time if the > model is large. It is implemented with TreeAggregate. I’ve extracted the > code from GradientDescent.scala to perform the benchmark. It allocates the > Array of a given size and the aggregates it: > > val dataSize = 12000000 > val n = 5 > val maxIterations = 3 > val rdd = sc.parallelize(0 until n, n).cache() > rdd.count() > var avgTime = 0.0 > for (i <- 1 to maxIterations) { > val start = System.nanoTime() > val result = rdd.treeAggregate((new Array[Double](dataSize), 0.0, 0L))( > seqOp = (c, v) => { > // c: (grad, loss, count) > val l = 0.0 > (c._1, c._2 + l, c._3 + 1) > }, > combOp = (c1, c2) => { > // c: (grad, loss, count) > (c1._1, c1._2 + c2._2, c1._3 + c2._3) > }) > avgTime += (System.nanoTime() - start) / 1e9 > assert(result._1.length == dataSize) > } > println("Avg time: " + avgTime / maxIterations) > > If I run on my cluster of 1 master and 5 workers, I get the following > results (given the array size = 12M): > n = 1: Avg time: 4.555709667333333 > n = 2 Avg time: 7.059724584666667 > n = 3 Avg time: 9.937117377666667 > n = 4 Avg time: 12.687526233 > n = 5 Avg time: 12.939526129666667 > > Could you explain why the time becomes so big? The data transfer of 12M > array of double should take ~ 1 second in 1Gbit network. There might be > other overheads, however not that big as I observe. > Best regards, Alexander > > > -- Thanks, Mike --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org