Thank you for all the replies! Realizing that I can't distribute the modelling with different cross-validation folds to the cluster nodes this way (but to the threads only), I decided not to create nfolds data sets but to parallelize the calculation (threadwise) over folds and to zip the original dataset with a sequence of indices indicating fold division: val data = sc.parallelize(orig_data zip fold_division)
(1 to nfolds).par.map( fold_i => { val svmAlg = new SVMWithSGD() val tr_data = data.filter(x => x._2 != fold_i).map(x => x._1) val test_data = data.filter(x => x._2 == fold_i).map(x => x._1) val model = svmAlg.run(tr_data) val labelAndPreds = test_data.map { point => val prediction = model.predict(point.features) (point.label, prediction) } val trainErr = labelAndPreds.filter(r => r._1 != r._2).count.toDouble / test_data.count trainErr.toDouble }) Really looking forward to the new functionalities in Spark 1.1! Nick Pentreath wrote > For linear models the 3rd option is by far most efficient and I suspect > what Evan is alluding to. > > > Unfortunately it's not directly possible with the classes in Mllib now so > you'll have to roll your own using underlying sgd / bfgs primitives. > — > Sent from Mailbox > > On Sat, Jul 5, 2014 at 10:45 AM, Christopher Nguyen < > ctn@ > > > wrote: > >> Hi sparkuser2345, >> I'm inferring the problem statement is something like "how do I make this >> complete faster (given my compute resources)?" >> Several comments. >> First, Spark only allows launching parallel tasks from the driver, not >> from >> workers, which is why you're seeing the exception when you try. Whether >> the >> latter is a sensible/doable idea is another discussion, but I can >> appreciate why many people assume this should be possible. >> Second, on optimization, you may be able to apply Sean's idea about >> (thread) parallelism at the driver, combined with the knowledge that >> often >> these cluster tasks bottleneck while competing for the same resources at >> the same time (cpu vs disk vs network, etc.) You may be able to achieve >> some performance optimization by randomizing these timings. This is not >> unlike GMail randomizing user storage locations around the world for load >> balancing. Here, you would partition each of your RDDs into a different >> number of partitions, making some tasks larger than others, and thus some >> may be in cpu-intensive map while others are shuffling data around the >> network. This is rather cluster-specific; I'd be interested in what you >> learn from such an exercise. >> Third, I find it useful always to consider doing as much as possible in >> one >> pass, subject to memory limits, e.g., mapPartitions() vs map(), thus >> minimizing map/shuffle/reduce boundaries with their context switches and >> data shuffling. In this case, notice how you're running the >> training+prediction k times over mostly the same rows, with map/reduce >> boundaries in between. While the training phase is sealed in this >> context, >> you may be able to improve performance by collecting all the k models >> together, and do a [m x k] predictions all at once which may end up being >> faster. >> Finally, as implied from the above, for the very common k-fold >> cross-validation pattern, the algorithm itself might be written to be >> smart >> enough to take both train and test data and "do the right thing" within >> itself, thus obviating the need for the user to prepare k data sets and >> running over them serially, and likely saving a lot of repeated >> computations in the right internal places. >> Enjoy, >> -- >> Christopher T. Nguyen >> Co-founder & CEO, Adatao <http://adatao.com> >> linkedin.com/in/ctnguyen >> On Sat, Jul 5, 2014 at 1:50 AM, Sean Owen < > sowen@ > > wrote: >>> If you call .par on data_kfolded it will become a parallel collection in >>> Scala and so the maps will happen in parallel . >>> On Jul 5, 2014 9:35 AM, "sparkuser2345" < > hm.spark.user@ > > wrote: >>> >>>> Hi, >>>> >>>> I am trying to fit a logistic regression model with cross validation in >>>> Spark 0.9.0 using SVMWithSGD. I have created an array data_kfolded >>>> where >>>> each element is a pair of RDDs containing the training and test data: >>>> >>>> (training_data: (RDD[org.apache.spark.mllib.regression.LabeledPoint], >>>> test_data: (RDD[org.apache.spark.mllib.regression.LabeledPoint]) >>>> >>>> scala> data_kfolded >>>> res21: >>>> >>>> Array[(org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint], >>>> org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint])] >>>> = >>>> Array((MappedRDD[9] at map at > <console> > :24,MappedRDD[7] at map at >>>> > <console> > :23), (MappedRDD[13] at map at > <console> > :24,MappedRDD[11] at map >>>> at >>>> > <console> > :23), (MappedRDD[17] at map at > <console> > :24,MappedRDD[15] at map >>>> at >>>> > <console> > :23)) >>>> >>>> Everything works fine when using data_kfolded: >>>> >>>> val validationErrors = >>>> data_kfolded.map { datafold => >>>> val svmAlg = new SVMWithSGD() >>>> val model_reg = svmAlg.run(datafold._1) >>>> val labelAndPreds = datafold._2.map { point => >>>> val prediction = model_reg.predict(point.features) >>>> (point.label, prediction) >>>> } >>>> val trainErr = labelAndPreds.filter(r => r._1 != r._2).count.toDouble >>>> / >>>> datafold._2.count >>>> trainErr.toDouble >>>> } >>>> >>>> scala> validationErrors >>>> res1: Array[Double] = Array(0.8819836785938481, 0.07082521117608837, >>>> 0.29833546734955185) >>>> >>>> However, I have understood that the models are not fitted in parallel >>>> as >>>> data_kfolded is not an RDD (although it's an array of pairs of RDDs). >>>> When >>>> running the same code where data_kfolded has been replaced with >>>> sc.parallelize(data_kfolded), I get a null pointer exception from the >>>> line >>>> where the run method of the SVMWithSGD object is called with the >>>> traning >>>> data. I guess this is somehow related to the fact that RDDs can't be >>>> accessed from inside a closure. I fail to understand though why the >>>> first >>>> version works and the second doesn't. Most importantly, is there a way >>>> to >>>> fit the models in parallel? I would really appreciate your help. >>>> >>>> val validationErrors = >>>> sc.parallelize(data_kfolded).map { datafold => >>>> val svmAlg = new SVMWithSGD() >>>> val model_reg = svmAlg.run(datafold._1) // This line gives null >>>> pointer >>>> exception >>>> val labelAndPreds = datafold._2.map { point => >>>> val prediction = model_reg.predict(point.features) >>>> (point.label, prediction) >>>> } >>>> val trainErr = labelAndPreds.filter(r => r._1 != r._2).count.toDouble >>>> / >>>> datafold._2.count >>>> trainErr.toDouble >>>> } >>>> validationErrors.collect >>>> >>>> java.lang.NullPointerException >>>> at org.apache.spark.rdd.RDD.firstParent(RDD.scala:971) >>>> at >>>> org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28) >>>> at >>>> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207) >>>> at >>>> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205) >>>> at scala.Option.getOrElse(Option.scala:120) >>>> at org.apache.spark.rdd.RDD.partitions(RDD.scala:205) >>>> at org.apache.spark.rdd.RDD.take(RDD.scala:824) >>>> at org.apache.spark.rdd.RDD.first(RDD.scala:856) >>>> at >>>> >>>> org.apache.spark.mllib.regression.GeneralizedLinearAlgorithm.run(GeneralizedLinearAlgorithm.scala:121) >>>> at >>>> $line28.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply( > <console> > :36) >>>> at >>>> $line28.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply( > <console> > :34) >>>> at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) >>>> at scala.collection.Iterator$class.foreach(Iterator.scala:727) >>>> at >>>> scala.collection.AbstractIterator.foreach(Iterator.scala:1157) >>>> at >>>> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) >>>> at >>>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) >>>> at >>>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) >>>> at >>>> scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) >>>> at scala.collection.AbstractIterator.to(Iterator.scala:1157) >>>> at >>>> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) >>>> at >>>> scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) >>>> at >>>> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) >>>> at >>>> scala.collection.AbstractIterator.toArray(Iterator.scala:1157) >>>> at org.apache.spark.rdd.RDD$$anonfun$4.apply(RDD.scala:602) >>>> at org.apache.spark.rdd.RDD$$anonfun$4.apply(RDD.scala:602) >>>> at >>>> >>>> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:888) >>>> at >>>> >>>> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:888) >>>> at >>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109) >>>> at org.apache.spark.scheduler.Task.run(Task.scala:53) >>>> at >>>> >>>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213) >>>> at >>>> >>>> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42) >>>> at >>>> >>>> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41) >>>> at java.security.AccessController.doPrivileged(Native Method) >>>> at javax.security.auth.Subject.doAs(Subject.java:415) >>>> at >>>> >>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548) >>>> at >>>> >>>> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41) >>>> at >>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178) >>>> at >>>> >>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >>>> at >>>> >>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >>>> at java.lang.Thread.run(Thread.java:744) >>>> >>>> >>>> >>>> >>>> -- >>>> View this message in context: >>>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-parallelize-model-fitting-with-different-cross-validation-folds-tp8839.html >>>> Sent from the Apache Spark User List mailing list archive at >>>> Nabble.com. >>>> >>> -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-parallelize-model-fitting-with-different-cross-validation-folds-tp8839p8904.html Sent from the Apache Spark User List mailing list archive at Nabble.com.