To get the results in a single file, you could do a repartition(1) and then save it.
ratesAndPreds.repartition(1).saveAsTextFile("/path/CFOutput") Thanks Best Regards On Mon, Nov 24, 2014 at 8:32 PM, Saurabh Agrawal <saurabh.agra...@markit.com > wrote: > > > Thanks for your help Akhil, however, this is creating an output folder and > storing the result sets in multiple files. Also the record count in the > result set seems to have multiplied!! Is there any other way to achieve > this? > > > > Thanks!! > > > > Regards, > > Saurabh Agrawal > > Vice President > > > > Markit > > > > Green Boulevard > > B-9A, Tower C > > 3rd Floor, Sector - 62, > > Noida 201301, India > > +91 120 611 8274 Office > > > > *From:* Akhil Das [mailto:ak...@sigmoidanalytics.com] > *Sent:* Monday, November 24, 2014 5:55 PM > *To:* Saurabh Agrawal > *Cc:* user@spark.apache.org > *Subject:* Re: Writing collection to file error > > > > Hi Saurabh, > > > > Here your ratesAndPreds is a RDD of type [((int, int), (Double, Double))] > not an Array. Now, if you want to save it on disk, then you can simply call > the saveAsTextFile and provide the location. > > > > So change your last line from this: > > > > ratesAndPreds.foreach(pw.println) > > > > to this > > > > ratesAndPreds.saveAsTextFile("/path/CFOutput") > > > > > > > > > Thanks > > Best Regards > > > > On Mon, Nov 24, 2014 at 5:05 PM, Saurabh Agrawal < > saurabh.agra...@markit.com> wrote: > > import org.apache.spark.mllib.recommendation.ALS > > import org.apache.spark.mllib.recommendation.Rating > > > > // Load and parse the data > > val data = sc.textFile("/path/CFReady.txt") > > val ratings = data.map(_.split('\t') match { case Array(user, item, rate) > => > > Rating(user.toInt, item.toInt, rate.toDouble) > > }) > > > > // Build the recommendation model using ALS > > val rank = 50 > > val numIterations = 100 > > val model = ALS.train(ratings, rank, numIterations, 0.10) > > > > // Evaluate the model on rating data > > val usersProducts = ratings.map { case Rating(user, product, rate) => > > (user, product) > > } > > val predictions = > > model.predict(usersProducts).map { case Rating(user, product, rate) => > > ((user, product), rate) > > } > > val ratesAndPreds = ratings.map { case Rating(user, product, rate) => > > ((user, product), rate) > > }.join(predictions) > > val MSE = ratesAndPreds.map { case ((user, product), (r1, r2)) => > > val err = (r1 - r2) > > err * err > > }.mean() > > println("Mean Squared Error = " + MSE) > > > > val pw = new PrintWriter(new File("/path/CFOutput.txt")) > > > > ratesAndPreds.foreach(pw.println) > > } > > > > > > Hi, > > > > Consider the highlighted code, I am trying to write the output of > ratesAndPreds array on to the disk. But I get error > > > > Task not serializable > > at > org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) > > at > org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) > > at > org.apache.spark.SparkContext.clean(SparkContext.scala:1242) > > at org.apache.spark.rdd.RDD.foreach(RDD.scala:758) > > at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:36) > > at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:41) > > at $iwC$$iwC$$iwC$$iwC.<init>(<console>:43) > > at $iwC$$iwC$$iwC.<init>(<console>:45) > > at $iwC$$iwC.<init>(<console>:47) > > at $iwC.<init>(<console>:49) > > at <init>(<console>:51) > > at .<init>(<console>:55) > > at .<clinit>(<console>) > > at .<init>(<console>:7) > > at .<clinit>(<console>) > > at $print(<console>) > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > at java.lang.reflect.Method.invoke(Method.java:606) > > at > org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789) > > at > org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062) > > at > org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:615) > > at > org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:646) > > at > org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:610) > > at > org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:814) > > at > org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:859) > > at > org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:771) > > at > org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:616) > > at > org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:624) > > at > org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:629) > > at > org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:954) > > at > org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902) > > at > org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902) > > at > scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) > > at > org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:902) > > at > org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:997) > > at org.apache.spark.repl.Main$.main(Main.scala:31) > > at org.apache.spark.repl.Main.main(Main.scala) > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > at java.lang.reflect.Method.invoke(Method.java:606) > > at > org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328) > > at > org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) > > at > org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) > > Caused by: java.io.NotSerializableException: java.io.PrintWriter > > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) > > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) > > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) > > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) > > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) > > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) > > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) > > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) > > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) > > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) > > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) > > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > > at > java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) > > at > org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) > > at > org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73) > > at > org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164) > > ... 45 more > > > > I am very new to Spark and Scala, can somebody please guide me here? > > > > Thanks, > > Saurabh > > > ------------------------------ > > This e-mail, including accompanying communications and attachments, is > strictly confidential and only for the intended recipient. Any retention, > use or disclosure not expressly authorised by Markit is prohibited. This > email is subject to all waivers and other terms at the following link: > http://www.markit.com/en/about/legal/email-disclaimer.page > > Please visit http://www.markit.com/en/about/contact/contact-us.page? for > contact information on our offices worldwide. > > MarkitSERV Limited has its registered office located at Level 4, Ropemaker > Place, 25 Ropemaker Street, London, EC2Y 9LY and is authorized and > regulated by the Financial Conduct Authority with registration number 207294 > > > > ------------------------------ > This e-mail, including accompanying communications and attachments, is > strictly confidential and only for the intended recipient. Any retention, > use or disclosure not expressly authorised by Markit is prohibited. This > email is subject to all waivers and other terms at the following link: > http://www.markit.com/en/about/legal/email-disclaimer.page > > Please visit http://www.markit.com/en/about/contact/contact-us.page? for > contact information on our offices worldwide. > > MarkitSERV Limited has its registered office located at Level 4, Ropemaker > Place, 25 Ropemaker Street, London, EC2Y 9LY and is authorized and > regulated by the Financial Conduct Authority with registration number 207294 >