Re: Improving Spark multithreaded performance?

2014-07-01 Thread Kyle Ellrott
This all seems pretty hackish and a lot of trouble to get around
limitations in mllib.
The big limitation is that right now, the optimization algorithms work on
one large dataset at a time. We need a second of set of methods to work on
a large number of medium sized datasets.
I've started to code a new set of optimization methods to add into mllib.
I've started with GroupedGradientDecent (
https://github.com/kellrott/spark/blob/mllib-grouped/mllib/src/main/scala/org/apache/spark/mllib/grouped/GroupedGradientDescent.scala
)

GroupedGradientDecent is based on GradientDecent, but instead, it takes
RDD[(Int, (Double, Vector))] as its data input rather then RDD[(Double,
Vector)]. The Int serves as key to mark which elements should be grouped
together. This lets you multiplex several dataset optimizations into the
same RDD.

I think I've gotten the GroupedGradientDecent to work correctly. I need to
go up the stack and start adding methods like SVMWithSGD.trainGroup.

Does anybody have any thoughts on this?

Kyle



On Fri, Jun 27, 2014 at 6:36 PM, Xiangrui Meng men...@gmail.com wrote:

 The RDD is cached in only one or two workers. All other executors need
 to fetch its content via network. Since the dataset is not huge, could
 you try this?

 val features: Array[Vector] = ...
 val featuresBc = sc.broadcast(features)
  // parallel loops
  val labels: Array[Double] =
  val rdd = sc.parallelize(0 until 1, 1).flatMap(i =
 featuresBc.value.view.zip(labels))
  val model = SVMWithSGD.train(rdd)
  models(i) = model

 Using BT broadcast factory would improve the performance of broadcasting.

 Best,
 Xiangrui

 On Fri, Jun 27, 2014 at 3:06 PM, Kyle Ellrott kellr...@soe.ucsc.edu
 wrote:
  1) I'm using the static SVMWithSGD.train, with no options.
  2) I have about 20,000 features (~5000 samples) that are being attached
 and
  trained against 14,000 different sets of labels (ie I'll be doing 14,000
  different training runs against the same sets of features trying to
 figure
  out which labels can be learned), and I would also like to do cross fold
  validation.
 
  The driver doesn't seem to be using too much memory. I left it as -Xmx8g
 and
  it never complained.
 
  Kyle
 
 
 
  On Fri, Jun 27, 2014 at 1:18 PM, Xiangrui Meng men...@gmail.com wrote:
 
  Hi Kyle,
 
  A few questions:
 
  1) Did you use `setIntercept(true)`?
  2) How many features?
 
  I'm a little worried about driver's load because the final aggregation
  and weights update happen on the driver. Did you check driver's memory
  usage as well?
 
  Best,
  Xiangrui
 
  On Fri, Jun 27, 2014 at 8:10 AM, Kyle Ellrott kellr...@soe.ucsc.edu
  wrote:
   As far as I can tell there are is no data to broadcast (unless there
 is
   something internal to mllib that needs to be broadcast) I've coalesced
   the
   input RDDs to keep the number of partitions limited. When running,
 I've
   tried to get up to 500 concurrent stages, and I've coalesced the RDDs
   down
   to 2 partitions, so about 1000 tasks.
   Despite having over 500 threads in the threadpool working on mllib
   tasks,
   the total CPU usage never really goes above 150%.
   I've tried increasing 'spark.akka.threads' but that doesn't seem to do
   anything.
  
   My one thought would be that maybe because I'm using MLUtils.kFold to
   generate the RDDs is that because I have so many tasks working off
 RDDs
   that
   are permutations of original RDDs that maybe that is creating some
 sort
   of
   dependency bottleneck.
  
   Kyle
  
  
   On Thu, Jun 26, 2014 at 6:35 PM, Aaron Davidson ilike...@gmail.com
   wrote:
  
   I don't have specific solutions for you, but the general things to
 try
   are:
  
   - Decrease task size by broadcasting any non-trivial objects.
   - Increase duration of tasks by making them less fine-grained.
  
   How many tasks are you sending? I've seen in the past something like
 25
   seconds for ~10k total medium-sized tasks.
  
  
   On Thu, Jun 26, 2014 at 12:06 PM, Kyle Ellrott 
 kellr...@soe.ucsc.edu
   wrote:
  
   I'm working to set up a calculation that involves calling mllib's
   SVMWithSGD.train several thousand times on different permutations of
   the
   data. I'm trying to run the separate jobs using a threadpool to
   dispatch the
   different requests to a spark context connected a Mesos's cluster,
   using
   course scheduling, and a max of 2000 cores on Spark 1.0.
   Total utilization of the system is terrible. Most of the 'aggregate
 at
   GradientDescent.scala:178' stages(where mllib spends most of its
 time)
   take
   about 3 seconds, but have ~25 seconds of scheduler delay time.
   What kind of things can I do to improve this?
  
   Kyle
  
  
  
 
 



Re: Improving Spark multithreaded performance?

2014-06-27 Thread anoldbrain
I have not used this, only watched a presentation of it in spark summit 2013.

https://github.com/radlab/sparrow
https://spark-summit.org/talk/ousterhout-next-generation-spark-scheduling-with-sparrow/

Pure conjecture from your high scheduling latency and the size of your
cluster, it seems one way to look at.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Improving-Spark-multithreaded-performance-tp8359p8411.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Improving Spark multithreaded performance?

2014-06-27 Thread Xiangrui Meng
Hi Kyle,

A few questions:

1) Did you use `setIntercept(true)`?
2) How many features?

I'm a little worried about driver's load because the final aggregation
and weights update happen on the driver. Did you check driver's memory
usage as well?

Best,
Xiangrui

On Fri, Jun 27, 2014 at 8:10 AM, Kyle Ellrott kellr...@soe.ucsc.edu wrote:
 As far as I can tell there are is no data to broadcast (unless there is
 something internal to mllib that needs to be broadcast) I've coalesced the
 input RDDs to keep the number of partitions limited. When running, I've
 tried to get up to 500 concurrent stages, and I've coalesced the RDDs down
 to 2 partitions, so about 1000 tasks.
 Despite having over 500 threads in the threadpool working on mllib tasks,
 the total CPU usage never really goes above 150%.
 I've tried increasing 'spark.akka.threads' but that doesn't seem to do
 anything.

 My one thought would be that maybe because I'm using MLUtils.kFold to
 generate the RDDs is that because I have so many tasks working off RDDs that
 are permutations of original RDDs that maybe that is creating some sort of
 dependency bottleneck.

 Kyle


 On Thu, Jun 26, 2014 at 6:35 PM, Aaron Davidson ilike...@gmail.com wrote:

 I don't have specific solutions for you, but the general things to try
 are:

 - Decrease task size by broadcasting any non-trivial objects.
 - Increase duration of tasks by making them less fine-grained.

 How many tasks are you sending? I've seen in the past something like 25
 seconds for ~10k total medium-sized tasks.


 On Thu, Jun 26, 2014 at 12:06 PM, Kyle Ellrott kellr...@soe.ucsc.edu
 wrote:

 I'm working to set up a calculation that involves calling mllib's
 SVMWithSGD.train several thousand times on different permutations of the
 data. I'm trying to run the separate jobs using a threadpool to dispatch the
 different requests to a spark context connected a Mesos's cluster, using
 course scheduling, and a max of 2000 cores on Spark 1.0.
 Total utilization of the system is terrible. Most of the 'aggregate at
 GradientDescent.scala:178' stages(where mllib spends most of its time) take
 about 3 seconds, but have ~25 seconds of scheduler delay time.
 What kind of things can I do to improve this?

 Kyle





Re: Improving Spark multithreaded performance?

2014-06-27 Thread Kyle Ellrott
1) I'm using the static SVMWithSGD.train, with no options.
2) I have about 20,000 features (~5000 samples) that are being attached and
trained against 14,000 different sets of labels (ie I'll be doing 14,000
different training runs against the same sets of features trying to figure
out which labels can be learned), and I would also like to do cross fold
validation.

The driver doesn't seem to be using too much memory. I left it as -Xmx8g
and it never complained.

Kyle



On Fri, Jun 27, 2014 at 1:18 PM, Xiangrui Meng men...@gmail.com wrote:

 Hi Kyle,

 A few questions:

 1) Did you use `setIntercept(true)`?
 2) How many features?

 I'm a little worried about driver's load because the final aggregation
 and weights update happen on the driver. Did you check driver's memory
 usage as well?

 Best,
 Xiangrui

 On Fri, Jun 27, 2014 at 8:10 AM, Kyle Ellrott kellr...@soe.ucsc.edu
 wrote:
  As far as I can tell there are is no data to broadcast (unless there is
  something internal to mllib that needs to be broadcast) I've coalesced
 the
  input RDDs to keep the number of partitions limited. When running, I've
  tried to get up to 500 concurrent stages, and I've coalesced the RDDs
 down
  to 2 partitions, so about 1000 tasks.
  Despite having over 500 threads in the threadpool working on mllib tasks,
  the total CPU usage never really goes above 150%.
  I've tried increasing 'spark.akka.threads' but that doesn't seem to do
  anything.
 
  My one thought would be that maybe because I'm using MLUtils.kFold to
  generate the RDDs is that because I have so many tasks working off RDDs
 that
  are permutations of original RDDs that maybe that is creating some sort
 of
  dependency bottleneck.
 
  Kyle
 
 
  On Thu, Jun 26, 2014 at 6:35 PM, Aaron Davidson ilike...@gmail.com
 wrote:
 
  I don't have specific solutions for you, but the general things to try
  are:
 
  - Decrease task size by broadcasting any non-trivial objects.
  - Increase duration of tasks by making them less fine-grained.
 
  How many tasks are you sending? I've seen in the past something like 25
  seconds for ~10k total medium-sized tasks.
 
 
  On Thu, Jun 26, 2014 at 12:06 PM, Kyle Ellrott kellr...@soe.ucsc.edu
  wrote:
 
  I'm working to set up a calculation that involves calling mllib's
  SVMWithSGD.train several thousand times on different permutations of
 the
  data. I'm trying to run the separate jobs using a threadpool to
 dispatch the
  different requests to a spark context connected a Mesos's cluster,
 using
  course scheduling, and a max of 2000 cores on Spark 1.0.
  Total utilization of the system is terrible. Most of the 'aggregate at
  GradientDescent.scala:178' stages(where mllib spends most of its time)
 take
  about 3 seconds, but have ~25 seconds of scheduler delay time.
  What kind of things can I do to improve this?
 
  Kyle
 
 
 



Improving Spark multithreaded performance?

2014-06-26 Thread Kyle Ellrott
I'm working to set up a calculation that involves calling
mllib's SVMWithSGD.train several thousand times on different permutations
of the data. I'm trying to run the separate jobs using a threadpool to
dispatch the different requests to a spark context connected a Mesos's
cluster, using course scheduling, and a max of 2000 cores on Spark 1.0.
Total utilization of the system is terrible. Most of the 'aggregate at
GradientDescent.scala:178' stages(where mllib spends most of its time) take
about 3 seconds, but have ~25 seconds of scheduler delay time.
What kind of things can I do to improve this?

Kyle


Re: Improving Spark multithreaded performance?

2014-06-26 Thread Aaron Davidson
I don't have specific solutions for you, but the general things to try are:

- Decrease task size by broadcasting any non-trivial objects.
- Increase duration of tasks by making them less fine-grained.

How many tasks are you sending? I've seen in the past something like 25
seconds for ~10k total medium-sized tasks.


On Thu, Jun 26, 2014 at 12:06 PM, Kyle Ellrott kellr...@soe.ucsc.edu
wrote:

 I'm working to set up a calculation that involves calling
 mllib's SVMWithSGD.train several thousand times on different permutations
 of the data. I'm trying to run the separate jobs using a threadpool to
 dispatch the different requests to a spark context connected a Mesos's
 cluster, using course scheduling, and a max of 2000 cores on Spark 1.0.
 Total utilization of the system is terrible. Most of the 'aggregate at
 GradientDescent.scala:178' stages(where mllib spends most of its time) take
 about 3 seconds, but have ~25 seconds of scheduler delay time.
 What kind of things can I do to improve this?

 Kyle