Re: Improving Spark multithreaded performance?
This all seems pretty hackish and a lot of trouble to get around limitations in mllib. The big limitation is that right now, the optimization algorithms work on one large dataset at a time. We need a second of set of methods to work on a large number of medium sized datasets. I've started to code a new set of optimization methods to add into mllib. I've started with GroupedGradientDecent ( https://github.com/kellrott/spark/blob/mllib-grouped/mllib/src/main/scala/org/apache/spark/mllib/grouped/GroupedGradientDescent.scala ) GroupedGradientDecent is based on GradientDecent, but instead, it takes RDD[(Int, (Double, Vector))] as its data input rather then RDD[(Double, Vector)]. The Int serves as key to mark which elements should be grouped together. This lets you multiplex several dataset optimizations into the same RDD. I think I've gotten the GroupedGradientDecent to work correctly. I need to go up the stack and start adding methods like SVMWithSGD.trainGroup. Does anybody have any thoughts on this? Kyle On Fri, Jun 27, 2014 at 6:36 PM, Xiangrui Meng men...@gmail.com wrote: The RDD is cached in only one or two workers. All other executors need to fetch its content via network. Since the dataset is not huge, could you try this? val features: Array[Vector] = ... val featuresBc = sc.broadcast(features) // parallel loops val labels: Array[Double] = val rdd = sc.parallelize(0 until 1, 1).flatMap(i = featuresBc.value.view.zip(labels)) val model = SVMWithSGD.train(rdd) models(i) = model Using BT broadcast factory would improve the performance of broadcasting. Best, Xiangrui On Fri, Jun 27, 2014 at 3:06 PM, Kyle Ellrott kellr...@soe.ucsc.edu wrote: 1) I'm using the static SVMWithSGD.train, with no options. 2) I have about 20,000 features (~5000 samples) that are being attached and trained against 14,000 different sets of labels (ie I'll be doing 14,000 different training runs against the same sets of features trying to figure out which labels can be learned), and I would also like to do cross fold validation. The driver doesn't seem to be using too much memory. I left it as -Xmx8g and it never complained. Kyle On Fri, Jun 27, 2014 at 1:18 PM, Xiangrui Meng men...@gmail.com wrote: Hi Kyle, A few questions: 1) Did you use `setIntercept(true)`? 2) How many features? I'm a little worried about driver's load because the final aggregation and weights update happen on the driver. Did you check driver's memory usage as well? Best, Xiangrui On Fri, Jun 27, 2014 at 8:10 AM, Kyle Ellrott kellr...@soe.ucsc.edu wrote: As far as I can tell there are is no data to broadcast (unless there is something internal to mllib that needs to be broadcast) I've coalesced the input RDDs to keep the number of partitions limited. When running, I've tried to get up to 500 concurrent stages, and I've coalesced the RDDs down to 2 partitions, so about 1000 tasks. Despite having over 500 threads in the threadpool working on mllib tasks, the total CPU usage never really goes above 150%. I've tried increasing 'spark.akka.threads' but that doesn't seem to do anything. My one thought would be that maybe because I'm using MLUtils.kFold to generate the RDDs is that because I have so many tasks working off RDDs that are permutations of original RDDs that maybe that is creating some sort of dependency bottleneck. Kyle On Thu, Jun 26, 2014 at 6:35 PM, Aaron Davidson ilike...@gmail.com wrote: I don't have specific solutions for you, but the general things to try are: - Decrease task size by broadcasting any non-trivial objects. - Increase duration of tasks by making them less fine-grained. How many tasks are you sending? I've seen in the past something like 25 seconds for ~10k total medium-sized tasks. On Thu, Jun 26, 2014 at 12:06 PM, Kyle Ellrott kellr...@soe.ucsc.edu wrote: I'm working to set up a calculation that involves calling mllib's SVMWithSGD.train several thousand times on different permutations of the data. I'm trying to run the separate jobs using a threadpool to dispatch the different requests to a spark context connected a Mesos's cluster, using course scheduling, and a max of 2000 cores on Spark 1.0. Total utilization of the system is terrible. Most of the 'aggregate at GradientDescent.scala:178' stages(where mllib spends most of its time) take about 3 seconds, but have ~25 seconds of scheduler delay time. What kind of things can I do to improve this? Kyle
Re: Improving Spark multithreaded performance?
I have not used this, only watched a presentation of it in spark summit 2013. https://github.com/radlab/sparrow https://spark-summit.org/talk/ousterhout-next-generation-spark-scheduling-with-sparrow/ Pure conjecture from your high scheduling latency and the size of your cluster, it seems one way to look at. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Improving-Spark-multithreaded-performance-tp8359p8411.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Improving Spark multithreaded performance?
Hi Kyle, A few questions: 1) Did you use `setIntercept(true)`? 2) How many features? I'm a little worried about driver's load because the final aggregation and weights update happen on the driver. Did you check driver's memory usage as well? Best, Xiangrui On Fri, Jun 27, 2014 at 8:10 AM, Kyle Ellrott kellr...@soe.ucsc.edu wrote: As far as I can tell there are is no data to broadcast (unless there is something internal to mllib that needs to be broadcast) I've coalesced the input RDDs to keep the number of partitions limited. When running, I've tried to get up to 500 concurrent stages, and I've coalesced the RDDs down to 2 partitions, so about 1000 tasks. Despite having over 500 threads in the threadpool working on mllib tasks, the total CPU usage never really goes above 150%. I've tried increasing 'spark.akka.threads' but that doesn't seem to do anything. My one thought would be that maybe because I'm using MLUtils.kFold to generate the RDDs is that because I have so many tasks working off RDDs that are permutations of original RDDs that maybe that is creating some sort of dependency bottleneck. Kyle On Thu, Jun 26, 2014 at 6:35 PM, Aaron Davidson ilike...@gmail.com wrote: I don't have specific solutions for you, but the general things to try are: - Decrease task size by broadcasting any non-trivial objects. - Increase duration of tasks by making them less fine-grained. How many tasks are you sending? I've seen in the past something like 25 seconds for ~10k total medium-sized tasks. On Thu, Jun 26, 2014 at 12:06 PM, Kyle Ellrott kellr...@soe.ucsc.edu wrote: I'm working to set up a calculation that involves calling mllib's SVMWithSGD.train several thousand times on different permutations of the data. I'm trying to run the separate jobs using a threadpool to dispatch the different requests to a spark context connected a Mesos's cluster, using course scheduling, and a max of 2000 cores on Spark 1.0. Total utilization of the system is terrible. Most of the 'aggregate at GradientDescent.scala:178' stages(where mllib spends most of its time) take about 3 seconds, but have ~25 seconds of scheduler delay time. What kind of things can I do to improve this? Kyle
Re: Improving Spark multithreaded performance?
1) I'm using the static SVMWithSGD.train, with no options. 2) I have about 20,000 features (~5000 samples) that are being attached and trained against 14,000 different sets of labels (ie I'll be doing 14,000 different training runs against the same sets of features trying to figure out which labels can be learned), and I would also like to do cross fold validation. The driver doesn't seem to be using too much memory. I left it as -Xmx8g and it never complained. Kyle On Fri, Jun 27, 2014 at 1:18 PM, Xiangrui Meng men...@gmail.com wrote: Hi Kyle, A few questions: 1) Did you use `setIntercept(true)`? 2) How many features? I'm a little worried about driver's load because the final aggregation and weights update happen on the driver. Did you check driver's memory usage as well? Best, Xiangrui On Fri, Jun 27, 2014 at 8:10 AM, Kyle Ellrott kellr...@soe.ucsc.edu wrote: As far as I can tell there are is no data to broadcast (unless there is something internal to mllib that needs to be broadcast) I've coalesced the input RDDs to keep the number of partitions limited. When running, I've tried to get up to 500 concurrent stages, and I've coalesced the RDDs down to 2 partitions, so about 1000 tasks. Despite having over 500 threads in the threadpool working on mllib tasks, the total CPU usage never really goes above 150%. I've tried increasing 'spark.akka.threads' but that doesn't seem to do anything. My one thought would be that maybe because I'm using MLUtils.kFold to generate the RDDs is that because I have so many tasks working off RDDs that are permutations of original RDDs that maybe that is creating some sort of dependency bottleneck. Kyle On Thu, Jun 26, 2014 at 6:35 PM, Aaron Davidson ilike...@gmail.com wrote: I don't have specific solutions for you, but the general things to try are: - Decrease task size by broadcasting any non-trivial objects. - Increase duration of tasks by making them less fine-grained. How many tasks are you sending? I've seen in the past something like 25 seconds for ~10k total medium-sized tasks. On Thu, Jun 26, 2014 at 12:06 PM, Kyle Ellrott kellr...@soe.ucsc.edu wrote: I'm working to set up a calculation that involves calling mllib's SVMWithSGD.train several thousand times on different permutations of the data. I'm trying to run the separate jobs using a threadpool to dispatch the different requests to a spark context connected a Mesos's cluster, using course scheduling, and a max of 2000 cores on Spark 1.0. Total utilization of the system is terrible. Most of the 'aggregate at GradientDescent.scala:178' stages(where mllib spends most of its time) take about 3 seconds, but have ~25 seconds of scheduler delay time. What kind of things can I do to improve this? Kyle
Improving Spark multithreaded performance?
I'm working to set up a calculation that involves calling mllib's SVMWithSGD.train several thousand times on different permutations of the data. I'm trying to run the separate jobs using a threadpool to dispatch the different requests to a spark context connected a Mesos's cluster, using course scheduling, and a max of 2000 cores on Spark 1.0. Total utilization of the system is terrible. Most of the 'aggregate at GradientDescent.scala:178' stages(where mllib spends most of its time) take about 3 seconds, but have ~25 seconds of scheduler delay time. What kind of things can I do to improve this? Kyle
Re: Improving Spark multithreaded performance?
I don't have specific solutions for you, but the general things to try are: - Decrease task size by broadcasting any non-trivial objects. - Increase duration of tasks by making them less fine-grained. How many tasks are you sending? I've seen in the past something like 25 seconds for ~10k total medium-sized tasks. On Thu, Jun 26, 2014 at 12:06 PM, Kyle Ellrott kellr...@soe.ucsc.edu wrote: I'm working to set up a calculation that involves calling mllib's SVMWithSGD.train several thousand times on different permutations of the data. I'm trying to run the separate jobs using a threadpool to dispatch the different requests to a spark context connected a Mesos's cluster, using course scheduling, and a max of 2000 cores on Spark 1.0. Total utilization of the system is terrible. Most of the 'aggregate at GradientDescent.scala:178' stages(where mllib spends most of its time) take about 3 seconds, but have ~25 seconds of scheduler delay time. What kind of things can I do to improve this? Kyle