Thank you for all the replies! 

Realizing that I can't distribute the modelling with different
cross-validation folds to the cluster nodes this way (but to the threads
only), I decided not to create nfolds data sets but to parallelize the
calculation (threadwise) over folds and to zip the original dataset with a
sequence of indices indicating fold division: 
 
val data = sc.parallelize(orig_data zip fold_division)

(1 to nfolds).par.map( fold_i => {
  val svmAlg    = new SVMWithSGD() 
  val tr_data   = data.filter(x => x._2 != fold_i).map(x => x._1) 
  val test_data = data.filter(x => x._2 == fold_i).map(x => x._1)
  val model     = svmAlg.run(tr_data)
  val labelAndPreds = test_data.map { point =>
    val prediction = model.predict(point.features)
    (point.label, prediction)
  }
  val trainErr = labelAndPreds.filter(r => r._1 != r._2).count.toDouble /
test_data.count
  trainErr.toDouble 
})

Really looking forward to the new functionalities in Spark 1.1!  



Nick Pentreath wrote
> For linear models the 3rd option is by far most efficient and I suspect
> what Evan is alluding to. 
> 
> 
> Unfortunately it's not directly possible with the classes in Mllib now so
> you'll have to roll your own using underlying sgd / bfgs primitives.
> —
> Sent from Mailbox
> 
> On Sat, Jul 5, 2014 at 10:45 AM, Christopher Nguyen <

> ctn@

> >
> wrote:
> 
>> Hi sparkuser2345,
>> I'm inferring the problem statement is something like "how do I make this
>> complete faster (given my compute resources)?"
>> Several comments.
>> First, Spark only allows launching parallel tasks from the driver, not
>> from
>> workers, which is why you're seeing the exception when you try. Whether
>> the
>> latter is a sensible/doable idea is another discussion, but I can
>> appreciate why many people assume this should be possible.
>> Second, on optimization, you may be able to apply Sean's idea about
>> (thread) parallelism at the driver, combined with the knowledge that
>> often
>> these cluster tasks bottleneck while competing for the same resources at
>> the same time (cpu vs disk vs network, etc.) You may be able to achieve
>> some performance optimization by randomizing these timings. This is not
>> unlike GMail randomizing user storage locations around the world for load
>> balancing. Here, you would partition each of your RDDs into a different
>> number of partitions, making some tasks larger than others, and thus some
>> may be in cpu-intensive map while others are shuffling data around the
>> network. This is rather cluster-specific; I'd be interested in what you
>> learn from such an exercise.
>> Third, I find it useful always to consider doing as much as possible in
>> one
>> pass, subject to memory limits, e.g., mapPartitions() vs map(), thus
>> minimizing map/shuffle/reduce boundaries with their context switches and
>> data shuffling. In this case, notice how you're running the
>> training+prediction k times over mostly the same rows, with map/reduce
>> boundaries in between. While the training phase is sealed in this
>> context,
>> you may be able to improve performance by collecting all the k models
>> together, and do a [m x k] predictions all at once which may end up being
>> faster.
>> Finally, as implied from the above, for the very common k-fold
>> cross-validation pattern, the algorithm itself might be written to be
>> smart
>> enough to take both train and test data and "do the right thing" within
>> itself, thus obviating the need for the user to prepare k data sets and
>> running over them serially, and likely saving a lot of repeated
>> computations in the right internal places.
>> Enjoy,
>> --
>> Christopher T. Nguyen
>> Co-founder & CEO, Adatao <http://adatao.com>
>> linkedin.com/in/ctnguyen
>> On Sat, Jul 5, 2014 at 1:50 AM, Sean Owen <

> sowen@

> > wrote:
>>> If you call .par on data_kfolded it will become a parallel collection in
>>> Scala and so the maps will happen in parallel .
>>> On Jul 5, 2014 9:35 AM, "sparkuser2345" <

> hm.spark.user@

> > wrote:
>>>
>>>> Hi,
>>>>
>>>> I am trying to fit a logistic regression model with cross validation in
>>>> Spark 0.9.0 using SVMWithSGD. I have created an array data_kfolded
>>>> where
>>>> each element is a pair of RDDs containing the training and test data:
>>>>
>>>> (training_data: (RDD[org.apache.spark.mllib.regression.LabeledPoint],
>>>> test_data: (RDD[org.apache.spark.mllib.regression.LabeledPoint])
>>>>
>>>> scala> data_kfolded
>>>> res21:
>>>>
>>>> Array[(org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint],
>>>> org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint])]
>>>> =
>>>> Array((MappedRDD[9] at map at 
> <console>
> :24,MappedRDD[7] at map at
>>>> 
> <console>
> :23), (MappedRDD[13] at map at 
> <console>
> :24,MappedRDD[11] at map
>>>> at
>>>> 
> <console>
> :23), (MappedRDD[17] at map at 
> <console>
> :24,MappedRDD[15] at map
>>>> at
>>>> 
> <console>
> :23))
>>>>
>>>> Everything works fine when using data_kfolded:
>>>>
>>>> val validationErrors =
>>>> data_kfolded.map { datafold =>
>>>>   val svmAlg = new SVMWithSGD()
>>>>   val model_reg = svmAlg.run(datafold._1)
>>>>   val labelAndPreds = datafold._2.map { point =>
>>>>     val prediction = model_reg.predict(point.features)
>>>>     (point.label, prediction)
>>>>   }
>>>>   val trainErr = labelAndPreds.filter(r => r._1 != r._2).count.toDouble
>>>> /
>>>> datafold._2.count
>>>>   trainErr.toDouble
>>>> }
>>>>
>>>> scala> validationErrors
>>>> res1: Array[Double] = Array(0.8819836785938481, 0.07082521117608837,
>>>> 0.29833546734955185)
>>>>
>>>> However, I have understood that the models are not fitted in parallel
>>>> as
>>>> data_kfolded is not an RDD (although it's an array of pairs of RDDs).
>>>> When
>>>> running the same code where data_kfolded has been replaced with
>>>> sc.parallelize(data_kfolded), I get a null pointer exception from the
>>>> line
>>>> where the run method of the SVMWithSGD object is called with the
>>>> traning
>>>> data. I guess this is somehow related to the fact that RDDs can't be
>>>> accessed from inside a closure. I fail to understand though why the
>>>> first
>>>> version works and the second doesn't. Most importantly, is there a way
>>>> to
>>>> fit the models in parallel? I would really appreciate your help.
>>>>
>>>> val validationErrors =
>>>> sc.parallelize(data_kfolded).map { datafold =>
>>>>   val svmAlg = new SVMWithSGD()
>>>>   val model_reg = svmAlg.run(datafold._1) // This line gives null
>>>> pointer
>>>> exception
>>>>   val labelAndPreds = datafold._2.map { point =>
>>>>     val prediction = model_reg.predict(point.features)
>>>>     (point.label, prediction)
>>>>   }
>>>>   val trainErr = labelAndPreds.filter(r => r._1 != r._2).count.toDouble
>>>> /
>>>> datafold._2.count
>>>>   trainErr.toDouble
>>>> }
>>>> validationErrors.collect
>>>>
>>>> java.lang.NullPointerException
>>>>         at org.apache.spark.rdd.RDD.firstParent(RDD.scala:971)
>>>>         at
>>>> org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
>>>>         at
>>>> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207)
>>>>         at
>>>> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
>>>>         at scala.Option.getOrElse(Option.scala:120)
>>>>         at org.apache.spark.rdd.RDD.partitions(RDD.scala:205)
>>>>         at org.apache.spark.rdd.RDD.take(RDD.scala:824)
>>>>         at org.apache.spark.rdd.RDD.first(RDD.scala:856)
>>>>         at
>>>>
>>>> org.apache.spark.mllib.regression.GeneralizedLinearAlgorithm.run(GeneralizedLinearAlgorithm.scala:121)
>>>>         at
>>>> $line28.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(
> <console>
> :36)
>>>>         at
>>>> $line28.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(
> <console>
> :34)
>>>>         at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>>>>         at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>>>         at
>>>> scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>>>         at
>>>> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>>>>         at
>>>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>>>>         at
>>>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
>>>>         at
>>>> scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
>>>>         at scala.collection.AbstractIterator.to(Iterator.scala:1157)
>>>>         at
>>>> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
>>>>         at
>>>> scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
>>>>         at
>>>> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
>>>>         at
>>>> scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
>>>>         at org.apache.spark.rdd.RDD$$anonfun$4.apply(RDD.scala:602)
>>>>         at org.apache.spark.rdd.RDD$$anonfun$4.apply(RDD.scala:602)
>>>>         at
>>>>
>>>> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:888)
>>>>         at
>>>>
>>>> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:888)
>>>>         at
>>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
>>>>         at org.apache.spark.scheduler.Task.run(Task.scala:53)
>>>>         at
>>>>
>>>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)
>>>>         at
>>>>
>>>> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
>>>>         at
>>>>
>>>> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41)
>>>>         at java.security.AccessController.doPrivileged(Native Method)
>>>>         at javax.security.auth.Subject.doAs(Subject.java:415)
>>>>         at
>>>>
>>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
>>>>         at
>>>>
>>>> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41)
>>>>         at
>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
>>>>         at
>>>>
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>         at
>>>>
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>         at java.lang.Thread.run(Thread.java:744)
>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> View this message in context:
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-parallelize-model-fitting-with-different-cross-validation-folds-tp8839.html
>>>> Sent from the Apache Spark User List mailing list archive at
>>>> Nabble.com.
>>>>
>>>





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-parallelize-model-fitting-with-different-cross-validation-folds-tp8839p8904.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Reply via email to