If you call .par on data_kfolded it will become a parallel collection in
Scala and so the maps will happen in parallel .
On Jul 5, 2014 9:35 AM, "sparkuser2345" <hm.spark.u...@gmail.com> wrote:

> Hi,
>
> I am trying to fit a logistic regression model with cross validation in
> Spark 0.9.0 using SVMWithSGD. I have created an array data_kfolded where
> each element is a pair of RDDs containing the training and test data:
>
> (training_data: (RDD[org.apache.spark.mllib.regression.LabeledPoint],
> test_data: (RDD[org.apache.spark.mllib.regression.LabeledPoint])
>
> scala> data_kfolded
> res21:
>
> Array[(org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint],
> org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint])]
> =
> Array((MappedRDD[9] at map at <console>:24,MappedRDD[7] at map at
> <console>:23), (MappedRDD[13] at map at <console>:24,MappedRDD[11] at map
> at
> <console>:23), (MappedRDD[17] at map at <console>:24,MappedRDD[15] at map
> at
> <console>:23))
>
> Everything works fine when using data_kfolded:
>
> val validationErrors =
> data_kfolded.map { datafold =>
>   val svmAlg = new SVMWithSGD()
>   val model_reg = svmAlg.run(datafold._1)
>   val labelAndPreds = datafold._2.map { point =>
>     val prediction = model_reg.predict(point.features)
>     (point.label, prediction)
>   }
>   val trainErr = labelAndPreds.filter(r => r._1 != r._2).count.toDouble /
> datafold._2.count
>   trainErr.toDouble
> }
>
> scala> validationErrors
> res1: Array[Double] = Array(0.8819836785938481, 0.07082521117608837,
> 0.29833546734955185)
>
> However, I have understood that the models are not fitted in parallel as
> data_kfolded is not an RDD (although it's an array of pairs of RDDs). When
> running the same code where data_kfolded has been replaced with
> sc.parallelize(data_kfolded), I get a null pointer exception from the line
> where the run method of the SVMWithSGD object is called with the traning
> data. I guess this is somehow related to the fact that RDDs can't be
> accessed from inside a closure. I fail to understand though why the first
> version works and the second doesn't. Most importantly, is there a way to
> fit the models in parallel? I would really appreciate your help.
>
> val validationErrors =
> sc.parallelize(data_kfolded).map { datafold =>
>   val svmAlg = new SVMWithSGD()
>   val model_reg = svmAlg.run(datafold._1) // This line gives null pointer
> exception
>   val labelAndPreds = datafold._2.map { point =>
>     val prediction = model_reg.predict(point.features)
>     (point.label, prediction)
>   }
>   val trainErr = labelAndPreds.filter(r => r._1 != r._2).count.toDouble /
> datafold._2.count
>   trainErr.toDouble
> }
> validationErrors.collect
>
> java.lang.NullPointerException
>         at org.apache.spark.rdd.RDD.firstParent(RDD.scala:971)
>         at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
>         at
> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:207)
>         at
> org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:205)
>         at scala.Option.getOrElse(Option.scala:120)
>         at org.apache.spark.rdd.RDD.partitions(RDD.scala:205)
>         at org.apache.spark.rdd.RDD.take(RDD.scala:824)
>         at org.apache.spark.rdd.RDD.first(RDD.scala:856)
>         at
>
> org.apache.spark.mllib.regression.GeneralizedLinearAlgorithm.run(GeneralizedLinearAlgorithm.scala:121)
>         at
> $line28.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:36)
>         at
> $line28.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:34)
>         at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>         at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>         at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>         at
> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>         at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>         at
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
>         at
> scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
>         at scala.collection.AbstractIterator.to(Iterator.scala:1157)
>         at
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
>         at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
>         at
> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
>         at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
>         at org.apache.spark.rdd.RDD$$anonfun$4.apply(RDD.scala:602)
>         at org.apache.spark.rdd.RDD$$anonfun$4.apply(RDD.scala:602)
>         at
>
> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:888)
>         at
>
> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:888)
>         at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
>         at org.apache.spark.scheduler.Task.run(Task.scala:53)
>         at
>
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:213)
>         at
>
> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
>         at
>
> org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41)
>         at java.security.AccessController.doPrivileged(Native Method)
>         at javax.security.auth.Subject.doAs(Subject.java:415)
>         at
>
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
>         at
> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41)
>         at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
>         at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>         at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>         at java.lang.Thread.run(Thread.java:744)
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-parallelize-model-fitting-with-different-cross-validation-folds-tp8839.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>

Reply via email to