To be clear - each of the RDDs is still a distributed dataset and each of
the individual SVM models will be trained in parallel across the cluster.
Sean's suggestion effectively has you submitting multiple spark jobs
simultaneously, which, depending on your cluster configuration and the size
of your dataset, may or may not be a good idea.

There are some tricks you can do to make training multiple models on the
same dataset faster, which we're hoping to expose to users in an upcoming
release.

- Evan


On Sat, Jul 5, 2014 at 1:50 AM, Sean Owen <so...@cloudera.com> wrote:

> If you call .par on data_kfolded it will become a parallel collection in
> Scala and so the maps will happen in parallel .
> On Jul 5, 2014 9:35 AM, "sparkuser2345" <hm.spark.u...@gmail.com> wrote:
>
>> Hi,
>>
>> I am trying to fit a logistic regression model with cross validation in
>> Spark 0.9.0 using SVMWithSGD. I have created an array data_kfolded where
>> each element is a pair of RDDs containing the training and test data:
>>
>> (training_data: (RDD[org.apache.spark.mllib.regression.LabeledPoint],
>> test_data: (RDD[org.apache.spark.mllib.regression.LabeledPoint])
>>
>> scala> data_kfolded
>> res21:
>>
>> Array[(org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint],
>> org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint])]
>> =
>> Array((MappedRDD[9] at map at <console>:24,MappedRDD[7] at map at
>> <console>:23), (MappedRDD[13] at map at <console>:24,MappedRDD[11] at map
>> at
>> <console>:23), (MappedRDD[17] at map at <console>:24,MappedRDD[15] at map
>> at
>> <console>:23))
>>
>> Everything works fine when using data_kfolded:
>>
>> val validationErrors =
>> data_kfolded.map { datafold =>
>>   val svmAlg = new SVMWithSGD()
>>   val model_reg = svmAlg.run(datafold._1)
>>   val labelAndPreds = datafold._2.map { point =>
>>     val prediction = model_reg.predict(point.features)
>>     (point.label, prediction)
>>   }
>>   val trainErr = labelAndPreds.filter(r => r._1 != r._2).count.toDouble /
>> datafold._2.count
>>   trainErr.toDouble
>> }
>>
>> scala> validationErrors
>> res1: Array[Double] = Array(0.8819836785938481, 0.07082521117608837,
>> 0.29833546734955185)
>>
>> However, I have understood that the models are not fitted in parallel as
>> data_kfolded is not an RDD (although it's an array of pairs of RDDs). When
>> running the same code where data_kfolded has been replaced with
>> sc.parallelize(data_kfolded), I get a null pointer exception from the line
>> where the run method of the SVMWithSGD object is called with the traning
>> data. I guess this is somehow related to the fact that RDDs can't be
>> accessed from inside a closure. I fail to understand though why the first
>> version works and the second doesn't. Most importantly, is there a way to
>> fit the models in parallel? I would really appreciate your help.
>>
>> val validationErrors =
>> sc.parallelize(data_kfolded).map { datafold =>
>>   val svmAlg = new SVMWithSGD()
>>   val model_reg = svmAlg.run(datafold._1) // This line gives null pointer
>> exception
>>   val labelAndPreds = datafold._2.map { point =>
>>     val prediction = model_reg.predict(point.features)
>>     (point.label, prediction)
>>   }
>>   val trainErr = labelAndPreds.filter(r => r._1 != r._2).count.toDouble /
>> datafold._2.count
>>   trainErr.toDouble
>> }
>> validationErrors.collect
>>
>> java.lang.NullPointerException
>>         at org.apache.spark.rdd.RDD.firstParent(RDD.scala:971)
>>         at
>> org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
>>         at
>> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207)
>>         at
>> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
>>         at scala.Option.getOrElse(Option.scala:120)
>>         at org.apache.spark.rdd.RDD.partitions(RDD.scala:205)
>>         at org.apache.spark.rdd.RDD.take(RDD.scala:824)
>>         at org.apache.spark.rdd.RDD.first(RDD.scala:856)
>>         at
>>
>> org.apache.spark.mllib.regression.GeneralizedLinearAlgorithm.run(GeneralizedLinearAlgorithm.scala:121)
>>         at
>> $line28.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:36)
>>         at
>> $line28.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:34)
>>         at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>>         at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>         at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>         at
>> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>>         at
>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>>         at
>> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
>>         at
>> scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
>>         at scala.collection.AbstractIterator.to(Iterator.scala:1157)
>>         at
>> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
>>         at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
>>         at
>> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
>>         at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
>>         at org.apache.spark.rdd.RDD$$anonfun$4.apply(RDD.scala:602)
>>         at org.apache.spark.rdd.RDD$$anonfun$4.apply(RDD.scala:602)
>>         at
>>
>> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:888)
>>         at
>>
>> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:888)
>>         at
>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
>>         at org.apache.spark.scheduler.Task.run(Task.scala:53)
>>         at
>>
>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)
>>         at
>>
>> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
>>         at
>>
>> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41)
>>         at java.security.AccessController.doPrivileged(Native Method)
>>         at javax.security.auth.Subject.doAs(Subject.java:415)
>>         at
>>
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
>>         at
>>
>> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41)
>>         at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
>>         at
>>
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>         at
>>
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>         at java.lang.Thread.run(Thread.java:744)
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-parallelize-model-fitting-with-different-cross-validation-folds-tp8839.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>

Reply via email to