This all seems pretty hackish and a lot of trouble to get around limitations in mllib. The big limitation is that right now, the optimization algorithms work on one large dataset at a time. We need a second of set of methods to work on a large number of medium sized datasets. I've started to code a new set of optimization methods to add into mllib. I've started with GroupedGradientDecent ( https://github.com/kellrott/spark/blob/mllib-grouped/mllib/src/main/scala/org/apache/spark/mllib/grouped/GroupedGradientDescent.scala )
GroupedGradientDecent is based on GradientDecent, but instead, it takes RDD[(Int, (Double, Vector))] as its data input rather then RDD[(Double, Vector)]. The Int serves as key to mark which elements should be grouped together. This lets you multiplex several dataset optimizations into the same RDD. I think I've gotten the GroupedGradientDecent to work correctly. I need to go up the stack and start adding methods like SVMWithSGD.trainGroup. Does anybody have any thoughts on this? Kyle On Fri, Jun 27, 2014 at 6:36 PM, Xiangrui Meng <men...@gmail.com> wrote: > The RDD is cached in only one or two workers. All other executors need > to fetch its content via network. Since the dataset is not huge, could > you try this? > > val features: Array[Vector] = ... > val featuresBc = sc.broadcast(features) > // parallel loops > val labels: Array[Double] = > val rdd = sc.parallelize(0 until 1, 1).flatMap(i => > featuresBc.value.view.zip(labels)) > val model = SVMWithSGD.train(rdd) > models(i) = model > > Using BT broadcast factory would improve the performance of broadcasting. > > Best, > Xiangrui > > On Fri, Jun 27, 2014 at 3:06 PM, Kyle Ellrott <kellr...@soe.ucsc.edu> > wrote: > > 1) I'm using the static SVMWithSGD.train, with no options. > > 2) I have about 20,000 features (~5000 samples) that are being attached > and > > trained against 14,000 different sets of labels (ie I'll be doing 14,000 > > different training runs against the same sets of features trying to > figure > > out which labels can be learned), and I would also like to do cross fold > > validation. > > > > The driver doesn't seem to be using too much memory. I left it as -Xmx8g > and > > it never complained. > > > > Kyle > > > > > > > > On Fri, Jun 27, 2014 at 1:18 PM, Xiangrui Meng <men...@gmail.com> wrote: > >> > >> Hi Kyle, > >> > >> A few questions: > >> > >> 1) Did you use `setIntercept(true)`? > >> 2) How many features? > >> > >> I'm a little worried about driver's load because the final aggregation > >> and weights update happen on the driver. Did you check driver's memory > >> usage as well? > >> > >> Best, > >> Xiangrui > >> > >> On Fri, Jun 27, 2014 at 8:10 AM, Kyle Ellrott <kellr...@soe.ucsc.edu> > >> wrote: > >> > As far as I can tell there are is no data to broadcast (unless there > is > >> > something internal to mllib that needs to be broadcast) I've coalesced > >> > the > >> > input RDDs to keep the number of partitions limited. When running, > I've > >> > tried to get up to 500 concurrent stages, and I've coalesced the RDDs > >> > down > >> > to 2 partitions, so about 1000 tasks. > >> > Despite having over 500 threads in the threadpool working on mllib > >> > tasks, > >> > the total CPU usage never really goes above 150%. > >> > I've tried increasing 'spark.akka.threads' but that doesn't seem to do > >> > anything. > >> > > >> > My one thought would be that maybe because I'm using MLUtils.kFold to > >> > generate the RDDs is that because I have so many tasks working off > RDDs > >> > that > >> > are permutations of original RDDs that maybe that is creating some > sort > >> > of > >> > dependency bottleneck. > >> > > >> > Kyle > >> > > >> > > >> > On Thu, Jun 26, 2014 at 6:35 PM, Aaron Davidson <ilike...@gmail.com> > >> > wrote: > >> >> > >> >> I don't have specific solutions for you, but the general things to > try > >> >> are: > >> >> > >> >> - Decrease task size by broadcasting any non-trivial objects. > >> >> - Increase duration of tasks by making them less fine-grained. > >> >> > >> >> How many tasks are you sending? I've seen in the past something like > 25 > >> >> seconds for ~10k total medium-sized tasks. > >> >> > >> >> > >> >> On Thu, Jun 26, 2014 at 12:06 PM, Kyle Ellrott < > kellr...@soe.ucsc.edu> > >> >> wrote: > >> >>> > >> >>> I'm working to set up a calculation that involves calling mllib's > >> >>> SVMWithSGD.train several thousand times on different permutations of > >> >>> the > >> >>> data. I'm trying to run the separate jobs using a threadpool to > >> >>> dispatch the > >> >>> different requests to a spark context connected a Mesos's cluster, > >> >>> using > >> >>> course scheduling, and a max of 2000 cores on Spark 1.0. > >> >>> Total utilization of the system is terrible. Most of the 'aggregate > at > >> >>> GradientDescent.scala:178' stages(where mllib spends most of its > time) > >> >>> take > >> >>> about 3 seconds, but have ~25 seconds of scheduler delay time. > >> >>> What kind of things can I do to improve this? > >> >>> > >> >>> Kyle > >> >> > >> >> > >> > > > > > >