Hi Saurabh,

Here your ratesAndPreds is a RDD of type [((int, int), (Double, Double))]
not an Array. Now, if you want to save it on disk, then you can simply call
the saveAsTextFile and provide the location.

So change your last line from this:

ratesAndPreds.foreach(pw.println)


to this

ratesAndPreds.saveAsTextFile("/path/CFOutput")





Thanks
Best Regards

On Mon, Nov 24, 2014 at 5:05 PM, Saurabh Agrawal <saurabh.agra...@markit.com
> wrote:

>    import org.apache.spark.mllib.recommendation.ALS
>
> import org.apache.spark.mllib.recommendation.Rating
>
>
>
> // Load and parse the data
>
> val data = sc.textFile("/path/CFReady.txt")
>
> val ratings = data.map(_.split('\t') match { case Array(user, item, rate)
> =>
>
>     Rating(user.toInt, item.toInt, rate.toDouble)
>
>   })
>
>
>
> // Build the recommendation model using ALS
>
> val rank = 50
>
> val numIterations = 100
>
> val model = ALS.train(ratings, rank, numIterations, 0.10)
>
>
>
> // Evaluate the model on rating data
>
> val usersProducts = ratings.map { case Rating(user, product, rate) =>
>
>   (user, product)
>
> }
>
> val predictions =
>
>   model.predict(usersProducts).map { case Rating(user, product, rate) =>
>
>     ((user, product), rate)
>
>   }
>
> val ratesAndPreds = ratings.map { case Rating(user, product, rate) =>
>
>   ((user, product), rate)
>
> }.join(predictions)
>
> val MSE = ratesAndPreds.map { case ((user, product), (r1, r2)) =>
>
>   val err = (r1 - r2)
>
>   err * err
>
> }.mean()
>
> println("Mean Squared Error = " + MSE)
>
>
>
> val pw = new PrintWriter(new File("/path/CFOutput.txt"))
>
>
>
> ratesAndPreds.foreach(pw.println)
>
> }
>
>
>
>
>
> Hi,
>
>
>
> Consider the highlighted code, I am trying to write the output of
> ratesAndPreds array on to the disk. But I get error
>
>
>
> Task not serializable
>
>                 at
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
>
>                 at
> org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
>
>                 at
> org.apache.spark.SparkContext.clean(SparkContext.scala:1242)
>
>                 at org.apache.spark.rdd.RDD.foreach(RDD.scala:758)
>
>                 at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:36)
>
>                 at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:41)
>
>                 at $iwC$$iwC$$iwC$$iwC.<init>(<console>:43)
>
>                 at $iwC$$iwC$$iwC.<init>(<console>:45)
>
>                 at $iwC$$iwC.<init>(<console>:47)
>
>                 at $iwC.<init>(<console>:49)
>
>                 at <init>(<console>:51)
>
>                 at .<init>(<console>:55)
>
>                 at .<clinit>(<console>)
>
>                 at .<init>(<console>:7)
>
>                 at .<clinit>(<console>)
>
>                 at $print(<console>)
>
>                 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method)
>
>                 at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>
>                 at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
>                 at java.lang.reflect.Method.invoke(Method.java:606)
>
>                 at
> org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789)
>
>                 at
> org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062)
>
>                 at
> org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:615)
>
>                 at
> org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:646)
>
>                 at
> org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:610)
>
>                 at
> org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:814)
>
>                 at
> org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:859)
>
>                 at
> org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:771)
>
>                 at
> org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:616)
>
>                 at
> org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:624)
>
>                 at
> org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:629)
>
>                 at
> org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:954)
>
>                 at
> org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902)
>
>                 at
> org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902)
>
>                 at
> scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
>
>                 at
> org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:902)
>
>                 at
> org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:997)
>
>                 at org.apache.spark.repl.Main$.main(Main.scala:31)
>
>                 at org.apache.spark.repl.Main.main(Main.scala)
>
>                 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method)
>
>                 at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>
>                 at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
>                 at java.lang.reflect.Method.invoke(Method.java:606)
>
>                 at
> org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328)
>
>                 at
> org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
>
>                 at
> org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>
> Caused by: java.io.NotSerializableException: java.io.PrintWriter
>
>                 at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
>
>                 at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>
>                 at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>
>                 at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>
>                 at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>
>                 at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>
>                 at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>
>                 at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>
>                 at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>
>                 at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>
>                 at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>
>                 at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>
>                 at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>
>                 at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>
>                 at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>
>                 at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>
>                 at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>
>                 at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>
>                 at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>
>                 at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>
>                 at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>
>                 at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>
>                 at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>
>                 at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>
>                 at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>
>                 at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>
>                 at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>
>                 at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>
>                 at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>
>                 at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>
>                 at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>
>                 at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>
>                 at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>
>                 at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>
>                 at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>
>                 at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>
>                 at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>
>                 at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>
>                 at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>
>                 at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>
>                 at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>
>                 at
> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
>
>                 at
> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
>
>                 at
> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
>
>                 at
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
>
>                 ... 45 more
>
>
>
> I am very new to Spark and Scala, can somebody please guide me here?
>
>
>
> Thanks,
>
> Saurabh
>
> ------------------------------
> This e-mail, including accompanying communications and attachments, is
> strictly confidential and only for the intended recipient. Any retention,
> use or disclosure not expressly authorised by Markit is prohibited. This
> email is subject to all waivers and other terms at the following link:
> http://www.markit.com/en/about/legal/email-disclaimer.page
>
> Please visit http://www.markit.com/en/about/contact/contact-us.page? for
> contact information on our offices worldwide.
>
> MarkitSERV Limited has its registered office located at Level 4, Ropemaker
> Place, 25 Ropemaker Street, London, EC2Y 9LY and is authorized and
> regulated by the Financial Conduct Authority with registration number 207294
>

Reply via email to