Re: How to parallelize model fitting with different cross-validation folds?

2014-07-05 Thread Sean Owen
If you call .par on data_kfolded it will become a parallel collection in
Scala and so the maps will happen in parallel .
On Jul 5, 2014 9:35 AM, "sparkuser2345"  wrote:

> Hi,
> I am trying to fit a logistic regression model with cross validation in
> Spark 0.9.0 using SVMWithSGD. I have created an array data_kfolded where
> each element is a pair of RDDs containing the training and test data:
> (training_data: (RDD[org.apache.spark.mllib.regression.LabeledPoint],
> test_data: (RDD[org.apache.spark.mllib.regression.LabeledPoint])
> scala> data_kfolded
> res21:
> Array[(org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint],
> org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint])]
> =
> Array((MappedRDD[9] at map at :24,MappedRDD[7] at map at
> :23), (MappedRDD[13] at map at :24,MappedRDD[11] at map
> at
> :23), (MappedRDD[17] at map at :24,MappedRDD[15] at map
> at
> :23))
> Everything works fine when using data_kfolded:
> val validationErrors =
> { datafold =>
>   val svmAlg = new SVMWithSGD()
>   val model_reg =
>   val labelAndPreds = { point =>
> val prediction = model_reg.predict(point.features)
> (point.label, prediction)
>   }
>   val trainErr = labelAndPreds.filter(r => r._1 != r._2).count.toDouble /
> datafold._2.count
>   trainErr.toDouble
> }
> scala> validationErrors
> res1: Array[Double] = Array(0.8819836785938481, 0.07082521117608837,
> 0.29833546734955185)
> However, I have understood that the models are not fitted in parallel as
> data_kfolded is not an RDD (although it's an array of pairs of RDDs). When
> running the same code where data_kfolded has been replaced with
> sc.parallelize(data_kfolded), I get a null pointer exception from the line
> where the run method of the SVMWithSGD object is called with the traning
> data. I guess this is somehow related to the fact that RDDs can't be
> accessed from inside a closure. I fail to understand though why the first
> version works and the second doesn't. Most importantly, is there a way to
> fit the models in parallel? I would really appreciate your help.
> val validationErrors =
> sc.parallelize(data_kfolded).map { datafold =>
>   val svmAlg = new SVMWithSGD()
>   val model_reg = // This line gives null pointer
> exception
>   val labelAndPreds = { point =>
> val prediction = model_reg.predict(point.features)
> (point.label, prediction)
>   }
>   val trainErr = labelAndPreds.filter(r => r._1 != r._2).count.toDouble /
> datafold._2.count
>   trainErr.toDouble
> }
> validationErrors.collect
> java.lang.NullPointerException
> at org.apache.spark.rdd.RDD.firstParent(RDD.scala:971)
> at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
> at
> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207)
> at
> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
> at scala.Option.getOrElse(Option.scala:120)
> at org.apache.spark.rdd.RDD.partitions(RDD.scala:205)
> at org.apache.spark.rdd.RDD.take(RDD.scala:824)
> at org.apache.spark.rdd.RDD.first(RDD.scala:856)
> at
> at
> $line28.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(:36)
> at
> $line28.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(:34)
> at scala.collection.Iterator$$anon$
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at
> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
> at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
> at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
> at
> scala.collection.TraversableOnce$
> at
> at
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
> at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
> at
> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
> at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
> at org.apache.spark.rdd.RDD$$anonfun$4.apply(RDD.scala:602)
> at org.apache.spark.rdd.RDD$$anonfun$4.apply(RDD.scala:602)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:888)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:888)
> at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
> at
> at

Re: How to parallelize model fitting with different cross-validation folds?

2014-07-05 Thread Evan R. Sparks
To be clear - each of the RDDs is still a distributed dataset and each of
the individual SVM models will be trained in parallel across the cluster.
Sean's suggestion effectively has you submitting multiple spark jobs
simultaneously, which, depending on your cluster configuration and the size
of your dataset, may or may not be a good idea.

There are some tricks you can do to make training multiple models on the
same dataset faster, which we're hoping to expose to users in an upcoming

- Evan

On Sat, Jul 5, 2014 at 1:50 AM, Sean Owen  wrote:

> If you call .par on data_kfolded it will become a parallel collection in
> Scala and so the maps will happen in parallel .
> On Jul 5, 2014 9:35 AM, "sparkuser2345"  wrote:
>> Hi,
>> I am trying to fit a logistic regression model with cross validation in
>> Spark 0.9.0 using SVMWithSGD. I have created an array data_kfolded where
>> each element is a pair of RDDs containing the training and test data:
>> (training_data: (RDD[org.apache.spark.mllib.regression.LabeledPoint],
>> test_data: (RDD[org.apache.spark.mllib.regression.LabeledPoint])
>> scala> data_kfolded
>> res21:
>> Array[(org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint],
>> org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint])]
>> =
>> Array((MappedRDD[9] at map at :24,MappedRDD[7] at map at
>> :23), (MappedRDD[13] at map at :24,MappedRDD[11] at map
>> at
>> :23), (MappedRDD[17] at map at :24,MappedRDD[15] at map
>> at
>> :23))
>> Everything works fine when using data_kfolded:
>> val validationErrors =
>> { datafold =>
>>   val svmAlg = new SVMWithSGD()
>>   val model_reg =
>>   val labelAndPreds = { point =>
>> val prediction = model_reg.predict(point.features)
>> (point.label, prediction)
>>   }
>>   val trainErr = labelAndPreds.filter(r => r._1 != r._2).count.toDouble /
>> datafold._2.count
>>   trainErr.toDouble
>> }
>> scala> validationErrors
>> res1: Array[Double] = Array(0.8819836785938481, 0.07082521117608837,
>> 0.29833546734955185)
>> However, I have understood that the models are not fitted in parallel as
>> data_kfolded is not an RDD (although it's an array of pairs of RDDs). When
>> running the same code where data_kfolded has been replaced with
>> sc.parallelize(data_kfolded), I get a null pointer exception from the line
>> where the run method of the SVMWithSGD object is called with the traning
>> data. I guess this is somehow related to the fact that RDDs can't be
>> accessed from inside a closure. I fail to understand though why the first
>> version works and the second doesn't. Most importantly, is there a way to
>> fit the models in parallel? I would really appreciate your help.
>> val validationErrors =
>> sc.parallelize(data_kfolded).map { datafold =>
>>   val svmAlg = new SVMWithSGD()
>>   val model_reg = // This line gives null pointer
>> exception
>>   val labelAndPreds = { point =>
>> val prediction = model_reg.predict(point.features)
>> (point.label, prediction)
>>   }
>>   val trainErr = labelAndPreds.filter(r => r._1 != r._2).count.toDouble /
>> datafold._2.count
>>   trainErr.toDouble
>> }
>> validationErrors.collect
>> java.lang.NullPointerException
>> at org.apache.spark.rdd.RDD.firstParent(RDD.scala:971)
>> at
>> org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
>> at
>> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207)
>> at
>> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
>> at scala.Option.getOrElse(Option.scala:120)
>> at org.apache.spark.rdd.RDD.partitions(RDD.scala:205)
>> at org.apache.spark.rdd.RDD.take(RDD.scala:824)
>> at org.apache.spark.rdd.RDD.first(RDD.scala:856)
>> at
>> at
>> $line28.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(:36)
>> at
>> $line28.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(:34)
>> at scala.collection.Iterator$$anon$
>> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>> at
>> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>> at
>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>> at
>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
>> at
>> scala.collection.TraversableOnce$
>> at
>> at
>> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
>> at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)

Re: How to parallelize model fitting with different cross-validation folds?

2014-07-05 Thread Christopher Nguyen
Hi sparkuser2345,

I'm inferring the problem statement is something like "how do I make this
complete faster (given my compute resources)?"

Several comments.

First, Spark only allows launching parallel tasks from the driver, not from
workers, which is why you're seeing the exception when you try. Whether the
latter is a sensible/doable idea is another discussion, but I can
appreciate why many people assume this should be possible.

Second, on optimization, you may be able to apply Sean's idea about
(thread) parallelism at the driver, combined with the knowledge that often
these cluster tasks bottleneck while competing for the same resources at
the same time (cpu vs disk vs network, etc.) You may be able to achieve
some performance optimization by randomizing these timings. This is not
unlike GMail randomizing user storage locations around the world for load
balancing. Here, you would partition each of your RDDs into a different
number of partitions, making some tasks larger than others, and thus some
may be in cpu-intensive map while others are shuffling data around the
network. This is rather cluster-specific; I'd be interested in what you
learn from such an exercise.

Third, I find it useful always to consider doing as much as possible in one
pass, subject to memory limits, e.g., mapPartitions() vs map(), thus
minimizing map/shuffle/reduce boundaries with their context switches and
data shuffling. In this case, notice how you're running the
training+prediction k times over mostly the same rows, with map/reduce
boundaries in between. While the training phase is sealed in this context,
you may be able to improve performance by collecting all the k models
together, and do a [m x k] predictions all at once which may end up being

Finally, as implied from the above, for the very common k-fold
cross-validation pattern, the algorithm itself might be written to be smart
enough to take both train and test data and "do the right thing" within
itself, thus obviating the need for the user to prepare k data sets and
running over them serially, and likely saving a lot of repeated
computations in the right internal places.

Christopher T. Nguyen
Co-founder & CEO, Adatao

On Sat, Jul 5, 2014 at 1:50 AM, Sean Owen  wrote:

> If you call .par on data_kfolded it will become a parallel collection in
> Scala and so the maps will happen in parallel .
> On Jul 5, 2014 9:35 AM, "sparkuser2345"  wrote:
>> Hi,
>> I am trying to fit a logistic regression model with cross validation in
>> Spark 0.9.0 using SVMWithSGD. I have created an array data_kfolded where
>> each element is a pair of RDDs containing the training and test data:
>> (training_data: (RDD[org.apache.spark.mllib.regression.LabeledPoint],
>> test_data: (RDD[org.apache.spark.mllib.regression.LabeledPoint])
>> scala> data_kfolded
>> res21:
>> Array[(org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint],
>> org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint])]
>> =
>> Array((MappedRDD[9] at map at :24,MappedRDD[7] at map at
>> :23), (MappedRDD[13] at map at :24,MappedRDD[11] at map
>> at
>> :23), (MappedRDD[17] at map at :24,MappedRDD[15] at map
>> at
>> :23))
>> Everything works fine when using data_kfolded:
>> val validationErrors =
>> { datafold =>
>>   val svmAlg = new SVMWithSGD()
>>   val model_reg =
>>   val labelAndPreds = { point =>
>> val prediction = model_reg.predict(point.features)
>> (point.label, prediction)
>>   }
>>   val trainErr = labelAndPreds.filter(r => r._1 != r._2).count.toDouble /
>> datafold._2.count
>>   trainErr.toDouble
>> }
>> scala> validationErrors
>> res1: Array[Double] = Array(0.8819836785938481, 0.07082521117608837,
>> 0.29833546734955185)
>> However, I have understood that the models are not fitted in parallel as
>> data_kfolded is not an RDD (although it's an array of pairs of RDDs). When
>> running the same code where data_kfolded has been replaced with
>> sc.parallelize(data_kfolded), I get a null pointer exception from the line
>> where the run method of the SVMWithSGD object is called with the traning
>> data. I guess this is somehow related to the fact that RDDs can't be
>> accessed from inside a closure. I fail to understand though why the first
>> version works and the second doesn't. Most importantly, is there a way to
>> fit the models in parallel? I would really appreciate your help.
>> val validationErrors =
>> sc.parallelize(data_kfolded).map { datafold =>
>>   val svmAlg = new SVMWithSGD()
>>   val model_reg = // This line gives null pointer
>> exception
>>   val labelAndPreds = { point =>
>> val prediction = model_reg.predict(point.features)
>> (point.label, prediction)
>>   }
>>   val trainErr = labelAndPreds.filter(r => r._1 != r._2).count.toDouble /
>> datafold._2.count

Re: How to parallelize model fitting with different cross-validation folds?

2014-07-05 Thread Nick Pentreath
For linear models the 3rd option is by far most efficient and I suspect what 
Evan is alluding to. 

Unfortunately it's not directly possible with the classes in Mllib now so 
you'll have to roll your own using underlying sgd / bfgs primitives.
Sent from Mailbox

On Sat, Jul 5, 2014 at 10:45 AM, Christopher Nguyen 

> Hi sparkuser2345,
> I'm inferring the problem statement is something like "how do I make this
> complete faster (given my compute resources)?"
> Several comments.
> First, Spark only allows launching parallel tasks from the driver, not from
> workers, which is why you're seeing the exception when you try. Whether the
> latter is a sensible/doable idea is another discussion, but I can
> appreciate why many people assume this should be possible.
> Second, on optimization, you may be able to apply Sean's idea about
> (thread) parallelism at the driver, combined with the knowledge that often
> these cluster tasks bottleneck while competing for the same resources at
> the same time (cpu vs disk vs network, etc.) You may be able to achieve
> some performance optimization by randomizing these timings. This is not
> unlike GMail randomizing user storage locations around the world for load
> balancing. Here, you would partition each of your RDDs into a different
> number of partitions, making some tasks larger than others, and thus some
> may be in cpu-intensive map while others are shuffling data around the
> network. This is rather cluster-specific; I'd be interested in what you
> learn from such an exercise.
> Third, I find it useful always to consider doing as much as possible in one
> pass, subject to memory limits, e.g., mapPartitions() vs map(), thus
> minimizing map/shuffle/reduce boundaries with their context switches and
> data shuffling. In this case, notice how you're running the
> training+prediction k times over mostly the same rows, with map/reduce
> boundaries in between. While the training phase is sealed in this context,
> you may be able to improve performance by collecting all the k models
> together, and do a [m x k] predictions all at once which may end up being
> faster.
> Finally, as implied from the above, for the very common k-fold
> cross-validation pattern, the algorithm itself might be written to be smart
> enough to take both train and test data and "do the right thing" within
> itself, thus obviating the need for the user to prepare k data sets and
> running over them serially, and likely saving a lot of repeated
> computations in the right internal places.
> Enjoy,
> --
> Christopher T. Nguyen
> Co-founder & CEO, Adatao 
> On Sat, Jul 5, 2014 at 1:50 AM, Sean Owen  wrote:
>> If you call .par on data_kfolded it will become a parallel collection in
>> Scala and so the maps will happen in parallel .
>> On Jul 5, 2014 9:35 AM, "sparkuser2345"  wrote:
>>> Hi,
>>> I am trying to fit a logistic regression model with cross validation in
>>> Spark 0.9.0 using SVMWithSGD. I have created an array data_kfolded where
>>> each element is a pair of RDDs containing the training and test data:
>>> (training_data: (RDD[org.apache.spark.mllib.regression.LabeledPoint],
>>> test_data: (RDD[org.apache.spark.mllib.regression.LabeledPoint])
>>> scala> data_kfolded
>>> res21:
>>> Array[(org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint],
>>> org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint])]
>>> =
>>> Array((MappedRDD[9] at map at :24,MappedRDD[7] at map at
>>> :23), (MappedRDD[13] at map at :24,MappedRDD[11] at map
>>> at
>>> :23), (MappedRDD[17] at map at :24,MappedRDD[15] at map
>>> at
>>> :23))
>>> Everything works fine when using data_kfolded:
>>> val validationErrors =
>>> { datafold =>
>>>   val svmAlg = new SVMWithSGD()
>>>   val model_reg =
>>>   val labelAndPreds = { point =>
>>> val prediction = model_reg.predict(point.features)
>>> (point.label, prediction)
>>>   }
>>>   val trainErr = labelAndPreds.filter(r => r._1 != r._2).count.toDouble /
>>> datafold._2.count
>>>   trainErr.toDouble
>>> }
>>> scala> validationErrors
>>> res1: Array[Double] = Array(0.8819836785938481, 0.07082521117608837,
>>> 0.29833546734955185)
>>> However, I have understood that the models are not fitted in parallel as
>>> data_kfolded is not an RDD (although it's an array of pairs of RDDs). When
>>> running the same code where data_kfolded has been replaced with
>>> sc.parallelize(data_kfolded), I get a null pointer exception from the line
>>> where the run method of the SVMWithSGD object is called with the traning
>>> data. I guess this is somehow related to the fact that RDDs can't be
>>> accessed from inside a closure. I fail to understand though why the first
>>> version works and the second doesn't. Most importantly, is there a way to
>>> fit the models in parallel? I would really appreciate your help.

Re: How to parallelize model fitting with different cross-validation folds?

2014-07-07 Thread sparkuser2345
Thank you for all the replies! 

Realizing that I can't distribute the modelling with different
cross-validation folds to the cluster nodes this way (but to the threads
only), I decided not to create nfolds data sets but to parallelize the
calculation (threadwise) over folds and to zip the original dataset with a
sequence of indices indicating fold division: 
val data = sc.parallelize(orig_data zip fold_division)

(1 to nfolds) fold_i => {
  val svmAlg= new SVMWithSGD() 
  val tr_data   = data.filter(x => x._2 != fold_i).map(x => x._1) 
  val test_data = data.filter(x => x._2 == fold_i).map(x => x._1)
  val model =
  val labelAndPreds = { point =>
val prediction = model.predict(point.features)
(point.label, prediction)
  val trainErr = labelAndPreds.filter(r => r._1 != r._2).count.toDouble /

Really looking forward to the new functionalities in Spark 1.1!  

Nick Pentreath wrote
> For linear models the 3rd option is by far most efficient and I suspect
> what Evan is alluding to. 
> Unfortunately it's not directly possible with the classes in Mllib now so
> you'll have to roll your own using underlying sgd / bfgs primitives.
> —
> Sent from Mailbox
> On Sat, Jul 5, 2014 at 10:45 AM, Christopher Nguyen <

> ctn@

> >
> wrote:
>> Hi sparkuser2345,
>> I'm inferring the problem statement is something like "how do I make this
>> complete faster (given my compute resources)?"
>> Several comments.
>> First, Spark only allows launching parallel tasks from the driver, not
>> from
>> workers, which is why you're seeing the exception when you try. Whether
>> the
>> latter is a sensible/doable idea is another discussion, but I can
>> appreciate why many people assume this should be possible.
>> Second, on optimization, you may be able to apply Sean's idea about
>> (thread) parallelism at the driver, combined with the knowledge that
>> often
>> these cluster tasks bottleneck while competing for the same resources at
>> the same time (cpu vs disk vs network, etc.) You may be able to achieve
>> some performance optimization by randomizing these timings. This is not
>> unlike GMail randomizing user storage locations around the world for load
>> balancing. Here, you would partition each of your RDDs into a different
>> number of partitions, making some tasks larger than others, and thus some
>> may be in cpu-intensive map while others are shuffling data around the
>> network. This is rather cluster-specific; I'd be interested in what you
>> learn from such an exercise.
>> Third, I find it useful always to consider doing as much as possible in
>> one
>> pass, subject to memory limits, e.g., mapPartitions() vs map(), thus
>> minimizing map/shuffle/reduce boundaries with their context switches and
>> data shuffling. In this case, notice how you're running the
>> training+prediction k times over mostly the same rows, with map/reduce
>> boundaries in between. While the training phase is sealed in this
>> context,
>> you may be able to improve performance by collecting all the k models
>> together, and do a [m x k] predictions all at once which may end up being
>> faster.
>> Finally, as implied from the above, for the very common k-fold
>> cross-validation pattern, the algorithm itself might be written to be
>> smart
>> enough to take both train and test data and "do the right thing" within
>> itself, thus obviating the need for the user to prepare k data sets and
>> running over them serially, and likely saving a lot of repeated
>> computations in the right internal places.
>> Enjoy,
>> --
>> Christopher T. Nguyen
>> Co-founder & CEO, Adatao ;
>> On Sat, Jul 5, 2014 at 1:50 AM, Sean Owen <

> sowen@

> > wrote:
>>> If you call .par on data_kfolded it will become a parallel collection in
>>> Scala and so the maps will happen in parallel .
>>> On Jul 5, 2014 9:35 AM, "sparkuser2345" <

> hm.spark.user@

> > wrote:

 I am trying to fit a logistic regression model with cross validation in
 Spark 0.9.0 using SVMWithSGD. I have created an array data_kfolded
 each element is a pair of RDDs containing the training and test data:

 (training_data: (RDD[org.apache.spark.mllib.regression.LabeledPoint],
 test_data: (RDD[org.apache.spark.mllib.regression.LabeledPoint])

 scala> data_kfolded

 Array((MappedRDD[9] at map at 
> :24,MappedRDD[7] at map at
> :23), (MappedRDD[13] at map at 
> :24,MappedRDD[11] at map
> :23), (MappedRDD[17] at map at 
> :24,MappedRDD[15] at map
> :23))

 Everything works fine when using data_kfolded:

 val validationErrors =