bcItemsIdx is just a broadcast variable constructed out of Array[(String)] .. it holds the item ids and I use it for indexing the MatrixEntry objects
On Mon, Aug 31, 2015 at 10:41 AM Sean Owen <so...@cloudera.com> wrote: > It's not clear; that error is different still and somehow suggests > you're serializing a stream somewhere. I'd look at what's inside > bcItemsIdx as that is not shown here. > > On Mon, Aug 31, 2015 at 3:34 PM, Ashish Shrowty > <ashish.shro...@gmail.com> wrote: > > Sean, > > > > Thanks for your comments. What I was really trying to do was to > transform a > > RDD[(userid,itemid,ratings)] into a RowMatrix so that I can do some > column > > similarity calculations while exploring the data before building some > > models. But to do that I need to first convert the user and item ids into > > respective indexes where I intended on passing in an array into the > closure, > > which is where I got stuck with this overflowerror trying to figure out > > where it is happening. The actual error I got was slightly different > (Caused > > by: java.io.NotSerializableException: java.io.ObjectInputStream). I > started > > investigating this issue which led me to the earlier code snippet that I > had > > posted. This is again because of the bcItemsIdx variable being passed > into > > the closure. Below code works if I don't pass in the variable and use > simply > > a constant like 10 in its place .. The code thus far - > > > > // rdd below is RDD[(String,String,Double)] > > // bcItemsIdx below is Broadcast[Array[String]] which is an array of item > > ids > > val gRdd = rdd.map{case(user,item,rating) => > > ((user),(item,rating))}.groupByKey > > val idxRdd = gRdd.zipWithIndex > > val cm = new CoordinateMatrix( > > idxRdd.flatMap[MatrixEntry](e => { > > e._1._2.map(item=> { > > MatrixEntry(e._2, bcItemsIdx.value.indexOf(item._1), > > item._2) // <- This is where I get the Serialization error passing in the > > index > > // MatrixEntry(e._2, 10, item._2) // <- This works > > }) > > }) > > ) > > val rm = cm.toRowMatrix > > val simMatrix = rm.columnSimilarities() > > > > I would like to make this work in the Spark shell as I am still exploring > > the data. Let me know if there is an alternate way of constructing the > > RowMatrix. > > > > Thanks and appreciate all the help! > > > > Ashish > > > > On Mon, Aug 31, 2015 at 3:41 AM Sean Owen <so...@cloudera.com> wrote: > >> > >> Yeah I see that now. I think it fails immediately because the map > >> operation does try to clean and/or verify the serialization of the > >> closure upfront. > >> > >> I'm not quite sure what is going on, but I think it's some strange > >> interaction between how you're building up the list and what the > >> resulting representation happens to be like, and how the closure > >> cleaner works, which can't be perfect. The shell also introduces an > >> extra layer of issues. > >> > >> For example, the slightly more canonical approaches work fine: > >> > >> import scala.collection.mutable.MutableList > >> val lst = MutableList[(String,String,Double)]() > >> (0 to 10000).foreach(i => lst :+ ("10", "10", i.toDouble)) > >> > >> or just > >> > >> val lst = (0 to 10000).map(i => ("10", "10", i.toDouble)) > >> > >> If you just need this to work, maybe those are better alternatives > anyway. > >> You can also check whether it works without the shell, as I suspect > >> that's a factor. > >> > >> It's not an error in Spark per se but saying that something's default > >> Java serialization graph is very deep, so it's like the code you wrote > >> plus the closure cleaner ends up pulling in some huge linked list and > >> serializing it the direct and unuseful way. > >> > >> If you have an idea about exactly why it's happening you can open a > >> JIRA, but arguably it's something that's nice to just work but isn't > >> to do with Spark per se. Or, have a look at others related to the > >> closure and shell and you may find this is related to other known > >> behavior. > >> > >> > >> On Sun, Aug 30, 2015 at 8:08 PM, Ashish Shrowty > >> <ashish.shro...@gmail.com> wrote: > >> > Sean .. does the code below work for you in the Spark shell? Ted got > the > >> > same error - > >> > > >> > val a=10 > >> > val lst = MutableList[(String,String,Double)]() > >> > Range(0,10000).foreach(i=>lst+=(("10","10",i:Double))) > >> > sc.makeRDD(lst).map(i=> if(a==10) 1 else 0) > >> > > >> > -Ashish > >> > > >> > > >> > On Sun, Aug 30, 2015 at 2:52 PM Sean Owen <so...@cloudera.com> wrote: > >> >> > >> >> I'm not sure how to reproduce it? this code does not produce an error > >> >> in > >> >> master. > >> >> > >> >> On Sun, Aug 30, 2015 at 7:26 PM, Ashish Shrowty > >> >> <ashish.shro...@gmail.com> wrote: > >> >> > Do you think I should create a JIRA? > >> >> > > >> >> > > >> >> > On Sun, Aug 30, 2015 at 12:56 PM Ted Yu <yuzhih...@gmail.com> > wrote: > >> >> >> > >> >> >> I got StackOverFlowError as well :-( > >> >> >> > >> >> >> On Sun, Aug 30, 2015 at 9:47 AM, Ashish Shrowty > >> >> >> <ashish.shro...@gmail.com> > >> >> >> wrote: > >> >> >>> > >> >> >>> Yep .. I tried that too earlier. Doesn't make a difference. Are > you > >> >> >>> able > >> >> >>> to replicate on your side? > >> >> >>> > >> >> >>> > >> >> >>> On Sun, Aug 30, 2015 at 12:08 PM Ted Yu <yuzhih...@gmail.com> > >> >> >>> wrote: > >> >> >>>> > >> >> >>>> I see. > >> >> >>>> > >> >> >>>> What about using the following in place of variable a ? > >> >> >>>> > >> >> >>>> > >> >> >>>> > >> >> >>>> > http://spark.apache.org/docs/latest/programming-guide.html#broadcast-variables > >> >> >>>> > >> >> >>>> Cheers > >> >> >>>> > >> >> >>>> On Sun, Aug 30, 2015 at 8:54 AM, Ashish Shrowty > >> >> >>>> <ashish.shro...@gmail.com> wrote: > >> >> >>>>> > >> >> >>>>> @Sean - Agree that there is no action, but I still get the > >> >> >>>>> stackoverflowerror, its very weird > >> >> >>>>> > >> >> >>>>> @Ted - Variable a is just an int - val a = 10 ... The error > >> >> >>>>> happens > >> >> >>>>> when I try to pass a variable into the closure. The example you > >> >> >>>>> have > >> >> >>>>> above > >> >> >>>>> works fine since there is no variable being passed into the > >> >> >>>>> closure > >> >> >>>>> from the > >> >> >>>>> shell. > >> >> >>>>> > >> >> >>>>> -Ashish > >> >> >>>>> > >> >> >>>>> On Sun, Aug 30, 2015 at 9:55 AM Ted Yu <yuzhih...@gmail.com> > >> >> >>>>> wrote: > >> >> >>>>>> > >> >> >>>>>> Using Spark shell : > >> >> >>>>>> > >> >> >>>>>> scala> import scala.collection.mutable.MutableList > >> >> >>>>>> import scala.collection.mutable.MutableList > >> >> >>>>>> > >> >> >>>>>> scala> val lst = MutableList[(String,String,Double)]() > >> >> >>>>>> lst: scala.collection.mutable.MutableList[(String, String, > >> >> >>>>>> Double)] > >> >> >>>>>> = > >> >> >>>>>> MutableList() > >> >> >>>>>> > >> >> >>>>>> scala> Range(0,10000).foreach(i=>lst+=(("10","10",i:Double))) > >> >> >>>>>> > >> >> >>>>>> scala> val rdd=sc.makeRDD(lst).map(i=> if(a==10) 1 else 0) > >> >> >>>>>> <console>:27: error: not found: value a > >> >> >>>>>> val rdd=sc.makeRDD(lst).map(i=> if(a==10) 1 else 0) > >> >> >>>>>> ^ > >> >> >>>>>> > >> >> >>>>>> scala> val rdd=sc.makeRDD(lst).map(i=> if(i._1==10) 1 else 0) > >> >> >>>>>> rdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at > map > >> >> >>>>>> at > >> >> >>>>>> <console>:27 > >> >> >>>>>> > >> >> >>>>>> scala> rdd.count() > >> >> >>>>>> ... > >> >> >>>>>> 15/08/30 06:53:40 INFO DAGScheduler: Job 0 finished: count at > >> >> >>>>>> <console>:30, took 0.478350 s > >> >> >>>>>> res1: Long = 10000 > >> >> >>>>>> > >> >> >>>>>> Ashish: > >> >> >>>>>> Please refine your example to mimic more closely what your > code > >> >> >>>>>> actually did. > >> >> >>>>>> > >> >> >>>>>> Thanks > >> >> >>>>>> > >> >> >>>>>> On Sun, Aug 30, 2015 at 12:24 AM, Sean Owen < > so...@cloudera.com> > >> >> >>>>>> wrote: > >> >> >>>>>>> > >> >> >>>>>>> That can't cause any error, since there is no action in your > >> >> >>>>>>> first > >> >> >>>>>>> snippet. Even calling count on the result doesn't cause an > >> >> >>>>>>> error. > >> >> >>>>>>> You > >> >> >>>>>>> must be executing something different. > >> >> >>>>>>> > >> >> >>>>>>> On Sun, Aug 30, 2015 at 4:21 AM, ashrowty > >> >> >>>>>>> <ashish.shro...@gmail.com> > >> >> >>>>>>> wrote: > >> >> >>>>>>> > I am running the Spark shell (1.2.1) in local mode and I > have > >> >> >>>>>>> > a > >> >> >>>>>>> > simple > >> >> >>>>>>> > RDD[(String,String,Double)] with about 10,000 objects in > it. > >> >> >>>>>>> > I > >> >> >>>>>>> > get > >> >> >>>>>>> > a > >> >> >>>>>>> > StackOverFlowError each time I try to run the following > code > >> >> >>>>>>> > (the > >> >> >>>>>>> > code > >> >> >>>>>>> > itself is just representative of other logic where I need > to > >> >> >>>>>>> > pass > >> >> >>>>>>> > in a > >> >> >>>>>>> > variable). I tried broadcasting the variable too, but no > luck > >> >> >>>>>>> > .. > >> >> >>>>>>> > missing > >> >> >>>>>>> > something basic here - > >> >> >>>>>>> > > >> >> >>>>>>> > val rdd = sc.makeRDD(List(<Data read from file>) > >> >> >>>>>>> > val a=10 > >> >> >>>>>>> > rdd.map(r => if (a==10) 1 else 0) > >> >> >>>>>>> > This throws - > >> >> >>>>>>> > > >> >> >>>>>>> > java.lang.StackOverflowError > >> >> >>>>>>> > at > >> >> >>>>>>> > > java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:318) > >> >> >>>>>>> > at > >> >> >>>>>>> > > >> >> >>>>>>> > > >> >> >>>>>>> > > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1133) > >> >> >>>>>>> > at > >> >> >>>>>>> > > >> >> >>>>>>> > > >> >> >>>>>>> > > >> >> >>>>>>> > > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > >> >> >>>>>>> > at > >> >> >>>>>>> > > >> >> >>>>>>> > > >> >> >>>>>>> > > >> >> >>>>>>> > > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) > >> >> >>>>>>> > at > >> >> >>>>>>> > > >> >> >>>>>>> > > >> >> >>>>>>> > > >> >> >>>>>>> > > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > >> >> >>>>>>> > at > >> >> >>>>>>> > > >> >> >>>>>>> > > >> >> >>>>>>> > > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > >> >> >>>>>>> > at > >> >> >>>>>>> > > >> >> >>>>>>> > > >> >> >>>>>>> > > >> >> >>>>>>> > > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > >> >> >>>>>>> > at > >> >> >>>>>>> > > >> >> >>>>>>> > > >> >> >>>>>>> > > >> >> >>>>>>> > > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) > >> >> >>>>>>> > at > >> >> >>>>>>> > > >> >> >>>>>>> > > >> >> >>>>>>> > > >> >> >>>>>>> > > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > >> >> >>>>>>> > ... > >> >> >>>>>>> > ... > >> >> >>>>>>> > > >> >> >>>>>>> > More experiments .. this works - > >> >> >>>>>>> > > >> >> >>>>>>> > val lst = > Range(0,10000).map(i=>("10","10",i:Double)).toList > >> >> >>>>>>> > sc.makeRDD(lst).map(i=> if(a==10) 1 else 0) > >> >> >>>>>>> > > >> >> >>>>>>> > But below doesn't and throws the StackoverflowError - > >> >> >>>>>>> > > >> >> >>>>>>> > val lst = MutableList[(String,String,Double)]() > >> >> >>>>>>> > Range(0,10000).foreach(i=>lst+=(("10","10",i:Double))) > >> >> >>>>>>> > sc.makeRDD(lst).map(i=> if(a==10) 1 else 0) > >> >> >>>>>>> > > >> >> >>>>>>> > Any help appreciated! > >> >> >>>>>>> > > >> >> >>>>>>> > Thanks, > >> >> >>>>>>> > Ashish > >> >> >>>>>>> > > >> >> >>>>>>> > > >> >> >>>>>>> > > >> >> >>>>>>> > -- > >> >> >>>>>>> > View this message in context: > >> >> >>>>>>> > > >> >> >>>>>>> > > >> >> >>>>>>> > > http://apache-spark-user-list.1001560.n3.nabble.com/Spark-shell-and-StackOverFlowError-tp24508.html > >> >> >>>>>>> > Sent from the Apache Spark User List mailing list archive > at > >> >> >>>>>>> > Nabble.com. > >> >> >>>>>>> > > >> >> >>>>>>> > > >> >> >>>>>>> > > >> >> >>>>>>> > > >> >> >>>>>>> > > --------------------------------------------------------------------- > >> >> >>>>>>> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > >> >> >>>>>>> > For additional commands, e-mail: > user-h...@spark.apache.org > >> >> >>>>>>> > > >> >> >>>>>>> > >> >> >>>>>>> > >> >> >>>>>>> > >> >> >>>>>>> > --------------------------------------------------------------------- > >> >> >>>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > >> >> >>>>>>> For additional commands, e-mail: user-h...@spark.apache.org > >> >> >>>>>>> > >> >> >>>>>> > >> >> >>>> > >> >> >> > >> >> > >