Sure .. here it is (scroll below to see the NotSerializableException). Note that upstream, I do load up the (user,item,ratings) data from a file using ObjectInputStream, do some calculations that I put in a map and then create the rdd used in the code above from that map. I even tried checkpointing the rdd and persisting it to break any lineage to the original ObjectInputStream (if that was what was happening) -
org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1478) at org.apache.spark.rdd.RDD.flatMap(RDD.scala:295) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:38) at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:46) at $iwC$$iwC$$iwC$$iwC.<init>(<console>:48) at $iwC$$iwC$$iwC.<init>(<console>:50) at $iwC$$iwC.<init>(<console>:52) at $iwC.<init>(<console>:54) at <init>(<console>:56) at .<init>(<console>:60) at .<clinit>(<console>) at .<init>(<console>:7) at .<clinit>(<console>) at $print(<console>) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:852) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1125) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:674) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:705) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:669) at org.apache.spark.repl.SparkILoop.pasteCommand(SparkILoop.scala:796) at org.apache.spark.repl.SparkILoop$$anonfun$standardCommands$8.apply(SparkILoop.scala:321) at org.apache.spark.repl.SparkILoop$$anonfun$standardCommands$8.apply(SparkILoop.scala:321) at scala.tools.nsc.interpreter.LoopCommands$LoopCommand$$anonfun$nullary$1.apply(LoopCommands.scala:65) at scala.tools.nsc.interpreter.LoopCommands$LoopCommand$$anonfun$nullary$1.apply(LoopCommands.scala:65) at scala.tools.nsc.interpreter.LoopCommands$NullaryCmd.apply(LoopCommands.scala:76) at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:780) at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:628) at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:636) at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:641) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:968) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:916) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:916) at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:916) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1011) at org.apache.spark.repl.Main$.main(Main.scala:31) at org.apache.spark.repl.Main.main(Main.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) *Caused by: java.io.NotSerializableException: java.io.ObjectInputStream* at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) ... ... On Mon, Aug 31, 2015 at 12:23 PM Ted Yu <yuzhih...@gmail.com> wrote: > Ashish: > Can you post the complete stack trace for NotSerializableException ? > > Cheers > > On Mon, Aug 31, 2015 at 8:49 AM, Ashish Shrowty <ashish.shro...@gmail.com> > wrote: > >> bcItemsIdx is just a broadcast variable constructed out of >> Array[(String)] .. it holds the item ids and I use it for indexing the >> MatrixEntry objects >> >> >> On Mon, Aug 31, 2015 at 10:41 AM Sean Owen <so...@cloudera.com> wrote: >> >>> It's not clear; that error is different still and somehow suggests >>> you're serializing a stream somewhere. I'd look at what's inside >>> bcItemsIdx as that is not shown here. >>> >>> On Mon, Aug 31, 2015 at 3:34 PM, Ashish Shrowty >>> >>> <ashish.shro...@gmail.com> wrote: >>> > Sean, >>> > >>> > Thanks for your comments. What I was really trying to do was to >>> transform a >>> > RDD[(userid,itemid,ratings)] into a RowMatrix so that I can do some >>> column >>> > similarity calculations while exploring the data before building some >>> > models. But to do that I need to first convert the user and item ids >>> into >>> > respective indexes where I intended on passing in an array into the >>> closure, >>> > which is where I got stuck with this overflowerror trying to figure out >>> > where it is happening. The actual error I got was slightly different >>> (Caused >>> > by: java.io.NotSerializableException: java.io.ObjectInputStream). I >>> started >>> > investigating this issue which led me to the earlier code snippet that >>> I had >>> > posted. This is again because of the bcItemsIdx variable being passed >>> into >>> > the closure. Below code works if I don't pass in the variable and use >>> simply >>> > a constant like 10 in its place .. The code thus far - >>> > >>> > // rdd below is RDD[(String,String,Double)] >>> > // bcItemsIdx below is Broadcast[Array[String]] which is an array of >>> item >>> > ids >>> > val gRdd = rdd.map{case(user,item,rating) => >>> > ((user),(item,rating))}.groupByKey >>> > val idxRdd = gRdd.zipWithIndex >>> > val cm = new CoordinateMatrix( >>> > idxRdd.flatMap[MatrixEntry](e => { >>> > e._1._2.map(item=> { >>> > MatrixEntry(e._2, bcItemsIdx.value.indexOf(item._1), >>> > item._2) // <- This is where I get the Serialization error passing in >>> the >>> > index >>> > // MatrixEntry(e._2, 10, item._2) // <- This works >>> > }) >>> > }) >>> > ) >>> > val rm = cm.toRowMatrix >>> > val simMatrix = rm.columnSimilarities() >>> > >>> > I would like to make this work in the Spark shell as I am still >>> exploring >>> > the data. Let me know if there is an alternate way of constructing the >>> > RowMatrix. >>> > >>> > Thanks and appreciate all the help! >>> > >>> > Ashish >>> > >>> > On Mon, Aug 31, 2015 at 3:41 AM Sean Owen <so...@cloudera.com> wrote: >>> >> >>> >> Yeah I see that now. I think it fails immediately because the map >>> >> operation does try to clean and/or verify the serialization of the >>> >> closure upfront. >>> >> >>> >> I'm not quite sure what is going on, but I think it's some strange >>> >> interaction between how you're building up the list and what the >>> >> resulting representation happens to be like, and how the closure >>> >> cleaner works, which can't be perfect. The shell also introduces an >>> >> extra layer of issues. >>> >> >>> >> For example, the slightly more canonical approaches work fine: >>> >> >>> >> import scala.collection.mutable.MutableList >>> >> val lst = MutableList[(String,String,Double)]() >>> >> (0 to 10000).foreach(i => lst :+ ("10", "10", i.toDouble)) >>> >> >>> >> or just >>> >> >>> >> val lst = (0 to 10000).map(i => ("10", "10", i.toDouble)) >>> >> >>> >> If you just need this to work, maybe those are better alternatives >>> anyway. >>> >> You can also check whether it works without the shell, as I suspect >>> >> that's a factor. >>> >> >>> >> It's not an error in Spark per se but saying that something's default >>> >> Java serialization graph is very deep, so it's like the code you wrote >>> >> plus the closure cleaner ends up pulling in some huge linked list and >>> >> serializing it the direct and unuseful way. >>> >> >>> >> If you have an idea about exactly why it's happening you can open a >>> >> JIRA, but arguably it's something that's nice to just work but isn't >>> >> to do with Spark per se. Or, have a look at others related to the >>> >> closure and shell and you may find this is related to other known >>> >> behavior. >>> >> >>> >> >>> >> On Sun, Aug 30, 2015 at 8:08 PM, Ashish Shrowty >>> >> <ashish.shro...@gmail.com> wrote: >>> >> > Sean .. does the code below work for you in the Spark shell? Ted >>> got the >>> >> > same error - >>> >> > >>> >> > val a=10 >>> >> > val lst = MutableList[(String,String,Double)]() >>> >> > Range(0,10000).foreach(i=>lst+=(("10","10",i:Double))) >>> >> > sc.makeRDD(lst).map(i=> if(a==10) 1 else 0) >>> >> > >>> >> > -Ashish >>> >> > >>> >> > >>> >> > On Sun, Aug 30, 2015 at 2:52 PM Sean Owen <so...@cloudera.com> >>> wrote: >>> >> >> >>> >> >> I'm not sure how to reproduce it? this code does not produce an >>> error >>> >> >> in >>> >> >> master. >>> >> >> >>> >> >> On Sun, Aug 30, 2015 at 7:26 PM, Ashish Shrowty >>> >> >> <ashish.shro...@gmail.com> wrote: >>> >> >> > Do you think I should create a JIRA? >>> >> >> > >>> >> >> > >>> >> >> > On Sun, Aug 30, 2015 at 12:56 PM Ted Yu <yuzhih...@gmail.com> >>> wrote: >>> >> >> >> >>> >> >> >> I got StackOverFlowError as well :-( >>> >> >> >> >>> >> >> >> On Sun, Aug 30, 2015 at 9:47 AM, Ashish Shrowty >>> >> >> >> <ashish.shro...@gmail.com> >>> >> >> >> wrote: >>> >> >> >>> >>> >> >> >>> Yep .. I tried that too earlier. Doesn't make a difference. >>> Are you >>> >> >> >>> able >>> >> >> >>> to replicate on your side? >>> >> >> >>> >>> >> >> >>> >>> >> >> >>> On Sun, Aug 30, 2015 at 12:08 PM Ted Yu <yuzhih...@gmail.com> >>> >> >> >>> wrote: >>> >> >> >>>> >>> >> >> >>>> I see. >>> >> >> >>>> >>> >> >> >>>> What about using the following in place of variable a ? >>> >> >> >>>> >>> >> >> >>>> >>> >> >> >>>> >>> >> >> >>>> >>> http://spark.apache.org/docs/latest/programming-guide.html#broadcast-variables >>> >> >> >>>> >>> >> >> >>>> Cheers >>> >> >> >>>> >>> >> >> >>>> On Sun, Aug 30, 2015 at 8:54 AM, Ashish Shrowty >>> >> >> >>>> <ashish.shro...@gmail.com> wrote: >>> >> >> >>>>> >>> >> >> >>>>> @Sean - Agree that there is no action, but I still get the >>> >> >> >>>>> stackoverflowerror, its very weird >>> >> >> >>>>> >>> >> >> >>>>> @Ted - Variable a is just an int - val a = 10 ... The error >>> >> >> >>>>> happens >>> >> >> >>>>> when I try to pass a variable into the closure. The example >>> you >>> >> >> >>>>> have >>> >> >> >>>>> above >>> >> >> >>>>> works fine since there is no variable being passed into the >>> >> >> >>>>> closure >>> >> >> >>>>> from the >>> >> >> >>>>> shell. >>> >> >> >>>>> >>> >> >> >>>>> -Ashish >>> >> >> >>>>> >>> >> >> >>>>> On Sun, Aug 30, 2015 at 9:55 AM Ted Yu <yuzhih...@gmail.com> >>> >> >> >>>>> wrote: >>> >> >> >>>>>> >>> >> >> >>>>>> Using Spark shell : >>> >> >> >>>>>> >>> >> >> >>>>>> scala> import scala.collection.mutable.MutableList >>> >> >> >>>>>> import scala.collection.mutable.MutableList >>> >> >> >>>>>> >>> >> >> >>>>>> scala> val lst = MutableList[(String,String,Double)]() >>> >> >> >>>>>> lst: scala.collection.mutable.MutableList[(String, String, >>> >> >> >>>>>> Double)] >>> >> >> >>>>>> = >>> >> >> >>>>>> MutableList() >>> >> >> >>>>>> >>> >> >> >>>>>> scala> >>> Range(0,10000).foreach(i=>lst+=(("10","10",i:Double))) >>> >> >> >>>>>> >>> >> >> >>>>>> scala> val rdd=sc.makeRDD(lst).map(i=> if(a==10) 1 else 0) >>> >> >> >>>>>> <console>:27: error: not found: value a >>> >> >> >>>>>> val rdd=sc.makeRDD(lst).map(i=> if(a==10) 1 else 0) >>> >> >> >>>>>> ^ >>> >> >> >>>>>> >>> >> >> >>>>>> scala> val rdd=sc.makeRDD(lst).map(i=> if(i._1==10) 1 else >>> 0) >>> >> >> >>>>>> rdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at >>> map >>> >> >> >>>>>> at >>> >> >> >>>>>> <console>:27 >>> >> >> >>>>>> >>> >> >> >>>>>> scala> rdd.count() >>> >> >> >>>>>> ... >>> >> >> >>>>>> 15/08/30 06:53:40 INFO DAGScheduler: Job 0 finished: count >>> at >>> >> >> >>>>>> <console>:30, took 0.478350 s >>> >> >> >>>>>> res1: Long = 10000 >>> >> >> >>>>>> >>> >> >> >>>>>> Ashish: >>> >> >> >>>>>> Please refine your example to mimic more closely what your >>> code >>> >> >> >>>>>> actually did. >>> >> >> >>>>>> >>> >> >> >>>>>> Thanks >>> >> >> >>>>>> >>> >> >> >>>>>> On Sun, Aug 30, 2015 at 12:24 AM, Sean Owen < >>> so...@cloudera.com> >>> >> >> >>>>>> wrote: >>> >> >> >>>>>>> >>> >> >> >>>>>>> That can't cause any error, since there is no action in >>> your >>> >> >> >>>>>>> first >>> >> >> >>>>>>> snippet. Even calling count on the result doesn't cause an >>> >> >> >>>>>>> error. >>> >> >> >>>>>>> You >>> >> >> >>>>>>> must be executing something different. >>> >> >> >>>>>>> >>> >> >> >>>>>>> On Sun, Aug 30, 2015 at 4:21 AM, ashrowty >>> >> >> >>>>>>> <ashish.shro...@gmail.com> >>> >> >> >>>>>>> wrote: >>> >> >> >>>>>>> > I am running the Spark shell (1.2.1) in local mode and I >>> have >>> >> >> >>>>>>> > a >>> >> >> >>>>>>> > simple >>> >> >> >>>>>>> > RDD[(String,String,Double)] with about 10,000 objects in >>> it. >>> >> >> >>>>>>> > I >>> >> >> >>>>>>> > get >>> >> >> >>>>>>> > a >>> >> >> >>>>>>> > StackOverFlowError each time I try to run the following >>> code >>> >> >> >>>>>>> > (the >>> >> >> >>>>>>> > code >>> >> >> >>>>>>> > itself is just representative of other logic where I >>> need to >>> >> >> >>>>>>> > pass >>> >> >> >>>>>>> > in a >>> >> >> >>>>>>> > variable). I tried broadcasting the variable too, but no >>> luck >>> >> >> >>>>>>> > .. >>> >> >> >>>>>>> > missing >>> >> >> >>>>>>> > something basic here - >>> >> >> >>>>>>> > >>> >> >> >>>>>>> > val rdd = sc.makeRDD(List(<Data read from file>) >>> >> >> >>>>>>> > val a=10 >>> >> >> >>>>>>> > rdd.map(r => if (a==10) 1 else 0) >>> >> >> >>>>>>> > This throws - >>> >> >> >>>>>>> > >>> >> >> >>>>>>> > java.lang.StackOverflowError >>> >> >> >>>>>>> > at >>> >> >> >>>>>>> > >>> java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:318) >>> >> >> >>>>>>> > at >>> >> >> >>>>>>> > >>> >> >> >>>>>>> > >>> >> >> >>>>>>> > >>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1133) >>> >> >> >>>>>>> > at >>> >> >> >>>>>>> > >>> >> >> >>>>>>> > >>> >> >> >>>>>>> > >>> >> >> >>>>>>> > >>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) >>> >> >> >>>>>>> > at >>> >> >> >>>>>>> > >>> >> >> >>>>>>> > >>> >> >> >>>>>>> > >>> >> >> >>>>>>> > >>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) >>> >> >> >>>>>>> > at >>> >> >> >>>>>>> > >>> >> >> >>>>>>> > >>> >> >> >>>>>>> > >>> >> >> >>>>>>> > >>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) >>> >> >> >>>>>>> > at >>> >> >> >>>>>>> > >>> >> >> >>>>>>> > >>> >> >> >>>>>>> > >>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) >>> >> >> >>>>>>> > at >>> >> >> >>>>>>> > >>> >> >> >>>>>>> > >>> >> >> >>>>>>> > >>> >> >> >>>>>>> > >>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) >>> >> >> >>>>>>> > at >>> >> >> >>>>>>> > >>> >> >> >>>>>>> > >>> >> >> >>>>>>> > >>> >> >> >>>>>>> > >>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) >>> >> >> >>>>>>> > at >>> >> >> >>>>>>> > >>> >> >> >>>>>>> > >>> >> >> >>>>>>> > >>> >> >> >>>>>>> > >>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) >>> >> >> >>>>>>> > ... >>> >> >> >>>>>>> > ... >>> >> >> >>>>>>> > >>> >> >> >>>>>>> > More experiments .. this works - >>> >> >> >>>>>>> > >>> >> >> >>>>>>> > val lst = >>> Range(0,10000).map(i=>("10","10",i:Double)).toList >>> >> >> >>>>>>> > sc.makeRDD(lst).map(i=> if(a==10) 1 else 0) >>> >> >> >>>>>>> > >>> >> >> >>>>>>> > But below doesn't and throws the StackoverflowError - >>> >> >> >>>>>>> > >>> >> >> >>>>>>> > val lst = MutableList[(String,String,Double)]() >>> >> >> >>>>>>> > Range(0,10000).foreach(i=>lst+=(("10","10",i:Double))) >>> >> >> >>>>>>> > sc.makeRDD(lst).map(i=> if(a==10) 1 else 0) >>> >> >> >>>>>>> > >>> >> >> >>>>>>> > Any help appreciated! >>> >> >> >>>>>>> > >>> >> >> >>>>>>> > Thanks, >>> >> >> >>>>>>> > Ashish >>> >> >> >>>>>>> > >>> >> >> >>>>>>> > >>> >> >> >>>>>>> > >>> >> >> >>>>>>> > -- >>> >> >> >>>>>>> > View this message in context: >>> >> >> >>>>>>> > >>> >> >> >>>>>>> > >>> >> >> >>>>>>> > >>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-shell-and-StackOverFlowError-tp24508.html >>> >> >> >>>>>>> > Sent from the Apache Spark User List mailing list >>> archive at >>> >> >> >>>>>>> > Nabble.com. >>> >> >> >>>>>>> > >>> >> >> >>>>>>> > >>> >> >> >>>>>>> > >>> >> >> >>>>>>> > >>> >> >> >>>>>>> > >>> --------------------------------------------------------------------- >>> >> >> >>>>>>> > To unsubscribe, e-mail: >>> user-unsubscr...@spark.apache.org >>> >> >> >>>>>>> > For additional commands, e-mail: >>> user-h...@spark.apache.org >>> >> >> >>>>>>> > >>> >> >> >>>>>>> >>> >> >> >>>>>>> >>> >> >> >>>>>>> >>> >> >> >>>>>>> >>> --------------------------------------------------------------------- >>> >> >> >>>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>> >> >> >>>>>>> For additional commands, e-mail: >>> user-h...@spark.apache.org >>> >> >> >>>>>>> >>> >> >> >>>>>> >>> >> >> >>>> >>> >> >> >> >>> >> >> > >>> >> >