bcItemsIdx is just a broadcast variable constructed out of Array[(String)]
.. it holds the item ids and I use it for indexing the MatrixEntry objects


On Mon, Aug 31, 2015 at 10:41 AM Sean Owen <so...@cloudera.com> wrote:

> It's not clear; that error is different still and somehow suggests
> you're serializing a stream somewhere. I'd look at what's inside
> bcItemsIdx as that is not shown here.
>
> On Mon, Aug 31, 2015 at 3:34 PM, Ashish Shrowty
> <ashish.shro...@gmail.com> wrote:
> > Sean,
> >
> > Thanks for your comments. What I was really trying to do was to
> transform a
> > RDD[(userid,itemid,ratings)] into a RowMatrix so that I can do some
> column
> > similarity calculations while exploring the data before building some
> > models. But to do that I need to first convert the user and item ids into
> > respective indexes where I intended on passing in an array into the
> closure,
> > which is where I got stuck with this overflowerror trying to figure out
> > where it is happening. The actual error I got was slightly different
> (Caused
> > by: java.io.NotSerializableException: java.io.ObjectInputStream). I
> started
> > investigating this issue which led me to the earlier code snippet that I
> had
> > posted. This is again because of the bcItemsIdx variable being passed
> into
> > the closure. Below code works if I don't pass in the variable and use
> simply
> > a constant like 10 in its place .. The code thus far -
> >
> > // rdd below is RDD[(String,String,Double)]
> > // bcItemsIdx below is Broadcast[Array[String]] which is an array of item
> > ids
> > val gRdd = rdd.map{case(user,item,rating) =>
> > ((user),(item,rating))}.groupByKey
> > val idxRdd = gRdd.zipWithIndex
> > val cm = new CoordinateMatrix(
> >     idxRdd.flatMap[MatrixEntry](e => {
> >         e._1._2.map(item=> {
> >                  MatrixEntry(e._2, bcItemsIdx.value.indexOf(item._1),
> > item._2) // <- This is where I get the Serialization error passing in the
> > index
> >                  // MatrixEntry(e._2, 10, item._2) // <- This works
> >         })
> >     })
> > )
> > val rm = cm.toRowMatrix
> > val simMatrix = rm.columnSimilarities()
> >
> > I would like to make this work in the Spark shell as I am still exploring
> > the data. Let me know if there is an alternate way of constructing the
> > RowMatrix.
> >
> > Thanks and appreciate all the help!
> >
> > Ashish
> >
> > On Mon, Aug 31, 2015 at 3:41 AM Sean Owen <so...@cloudera.com> wrote:
> >>
> >> Yeah I see that now. I think it fails immediately because the map
> >> operation does try to clean and/or verify the serialization of the
> >> closure upfront.
> >>
> >> I'm not quite sure what is going on, but I think it's some strange
> >> interaction between how you're building up the list and what the
> >> resulting representation happens to be like, and how the closure
> >> cleaner works, which can't be perfect. The shell also introduces an
> >> extra layer of issues.
> >>
> >> For example, the slightly more canonical approaches work fine:
> >>
> >> import scala.collection.mutable.MutableList
> >> val lst = MutableList[(String,String,Double)]()
> >> (0 to 10000).foreach(i => lst :+ ("10", "10", i.toDouble))
> >>
> >> or just
> >>
> >> val lst = (0 to 10000).map(i => ("10", "10", i.toDouble))
> >>
> >> If you just need this to work, maybe those are better alternatives
> anyway.
> >> You can also check whether it works without the shell, as I suspect
> >> that's a factor.
> >>
> >> It's not an error in Spark per se but saying that something's default
> >> Java serialization graph is very deep, so it's like the code you wrote
> >> plus the closure cleaner ends up pulling in some huge linked list and
> >> serializing it the direct and unuseful way.
> >>
> >> If you have an idea about exactly why it's happening you can open a
> >> JIRA, but arguably it's something that's nice to just work but isn't
> >> to do with Spark per se. Or, have a look at others related to the
> >> closure and shell and you may find this is related to other known
> >> behavior.
> >>
> >>
> >> On Sun, Aug 30, 2015 at 8:08 PM, Ashish Shrowty
> >> <ashish.shro...@gmail.com> wrote:
> >> > Sean .. does the code below work for you in the Spark shell? Ted got
> the
> >> > same error -
> >> >
> >> > val a=10
> >> > val lst = MutableList[(String,String,Double)]()
> >> > Range(0,10000).foreach(i=>lst+=(("10","10",i:Double)))
> >> > sc.makeRDD(lst).map(i=> if(a==10) 1 else 0)
> >> >
> >> > -Ashish
> >> >
> >> >
> >> > On Sun, Aug 30, 2015 at 2:52 PM Sean Owen <so...@cloudera.com> wrote:
> >> >>
> >> >> I'm not sure how to reproduce it? this code does not produce an error
> >> >> in
> >> >> master.
> >> >>
> >> >> On Sun, Aug 30, 2015 at 7:26 PM, Ashish Shrowty
> >> >> <ashish.shro...@gmail.com> wrote:
> >> >> > Do you think I should create a JIRA?
> >> >> >
> >> >> >
> >> >> > On Sun, Aug 30, 2015 at 12:56 PM Ted Yu <yuzhih...@gmail.com>
> wrote:
> >> >> >>
> >> >> >> I got StackOverFlowError as well :-(
> >> >> >>
> >> >> >> On Sun, Aug 30, 2015 at 9:47 AM, Ashish Shrowty
> >> >> >> <ashish.shro...@gmail.com>
> >> >> >> wrote:
> >> >> >>>
> >> >> >>> Yep .. I tried that too earlier. Doesn't make a difference. Are
> you
> >> >> >>> able
> >> >> >>> to replicate on your side?
> >> >> >>>
> >> >> >>>
> >> >> >>> On Sun, Aug 30, 2015 at 12:08 PM Ted Yu <yuzhih...@gmail.com>
> >> >> >>> wrote:
> >> >> >>>>
> >> >> >>>> I see.
> >> >> >>>>
> >> >> >>>> What about using the following in place of variable a ?
> >> >> >>>>
> >> >> >>>>
> >> >> >>>>
> >> >> >>>>
> http://spark.apache.org/docs/latest/programming-guide.html#broadcast-variables
> >> >> >>>>
> >> >> >>>> Cheers
> >> >> >>>>
> >> >> >>>> On Sun, Aug 30, 2015 at 8:54 AM, Ashish Shrowty
> >> >> >>>> <ashish.shro...@gmail.com> wrote:
> >> >> >>>>>
> >> >> >>>>> @Sean - Agree that there is no action, but I still get the
> >> >> >>>>> stackoverflowerror, its very weird
> >> >> >>>>>
> >> >> >>>>> @Ted - Variable a is just an int - val a = 10 ... The error
> >> >> >>>>> happens
> >> >> >>>>> when I try to pass a variable into the closure. The example you
> >> >> >>>>> have
> >> >> >>>>> above
> >> >> >>>>> works fine since there is no variable being passed into the
> >> >> >>>>> closure
> >> >> >>>>> from the
> >> >> >>>>> shell.
> >> >> >>>>>
> >> >> >>>>> -Ashish
> >> >> >>>>>
> >> >> >>>>> On Sun, Aug 30, 2015 at 9:55 AM Ted Yu <yuzhih...@gmail.com>
> >> >> >>>>> wrote:
> >> >> >>>>>>
> >> >> >>>>>> Using Spark shell :
> >> >> >>>>>>
> >> >> >>>>>> scala> import scala.collection.mutable.MutableList
> >> >> >>>>>> import scala.collection.mutable.MutableList
> >> >> >>>>>>
> >> >> >>>>>> scala> val lst = MutableList[(String,String,Double)]()
> >> >> >>>>>> lst: scala.collection.mutable.MutableList[(String, String,
> >> >> >>>>>> Double)]
> >> >> >>>>>> =
> >> >> >>>>>> MutableList()
> >> >> >>>>>>
> >> >> >>>>>> scala> Range(0,10000).foreach(i=>lst+=(("10","10",i:Double)))
> >> >> >>>>>>
> >> >> >>>>>> scala> val rdd=sc.makeRDD(lst).map(i=> if(a==10) 1 else 0)
> >> >> >>>>>> <console>:27: error: not found: value a
> >> >> >>>>>>        val rdd=sc.makeRDD(lst).map(i=> if(a==10) 1 else 0)
> >> >> >>>>>>                                           ^
> >> >> >>>>>>
> >> >> >>>>>> scala> val rdd=sc.makeRDD(lst).map(i=> if(i._1==10) 1 else 0)
> >> >> >>>>>> rdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at
> map
> >> >> >>>>>> at
> >> >> >>>>>> <console>:27
> >> >> >>>>>>
> >> >> >>>>>> scala> rdd.count()
> >> >> >>>>>> ...
> >> >> >>>>>> 15/08/30 06:53:40 INFO DAGScheduler: Job 0 finished: count at
> >> >> >>>>>> <console>:30, took 0.478350 s
> >> >> >>>>>> res1: Long = 10000
> >> >> >>>>>>
> >> >> >>>>>> Ashish:
> >> >> >>>>>> Please refine your example to mimic more closely what your
> code
> >> >> >>>>>> actually did.
> >> >> >>>>>>
> >> >> >>>>>> Thanks
> >> >> >>>>>>
> >> >> >>>>>> On Sun, Aug 30, 2015 at 12:24 AM, Sean Owen <
> so...@cloudera.com>
> >> >> >>>>>> wrote:
> >> >> >>>>>>>
> >> >> >>>>>>> That can't cause any error, since there is no action in your
> >> >> >>>>>>> first
> >> >> >>>>>>> snippet. Even calling count on the result doesn't cause an
> >> >> >>>>>>> error.
> >> >> >>>>>>> You
> >> >> >>>>>>> must be executing something different.
> >> >> >>>>>>>
> >> >> >>>>>>> On Sun, Aug 30, 2015 at 4:21 AM, ashrowty
> >> >> >>>>>>> <ashish.shro...@gmail.com>
> >> >> >>>>>>> wrote:
> >> >> >>>>>>> > I am running the Spark shell (1.2.1) in local mode and I
> have
> >> >> >>>>>>> > a
> >> >> >>>>>>> > simple
> >> >> >>>>>>> > RDD[(String,String,Double)] with about 10,000 objects in
> it.
> >> >> >>>>>>> > I
> >> >> >>>>>>> > get
> >> >> >>>>>>> > a
> >> >> >>>>>>> > StackOverFlowError each time I try to run the following
> code
> >> >> >>>>>>> > (the
> >> >> >>>>>>> > code
> >> >> >>>>>>> > itself is just representative of other logic where I need
> to
> >> >> >>>>>>> > pass
> >> >> >>>>>>> > in a
> >> >> >>>>>>> > variable). I tried broadcasting the variable too, but no
> luck
> >> >> >>>>>>> > ..
> >> >> >>>>>>> > missing
> >> >> >>>>>>> > something basic here -
> >> >> >>>>>>> >
> >> >> >>>>>>> > val rdd = sc.makeRDD(List(<Data read from file>)
> >> >> >>>>>>> > val a=10
> >> >> >>>>>>> > rdd.map(r => if (a==10) 1 else 0)
> >> >> >>>>>>> > This throws -
> >> >> >>>>>>> >
> >> >> >>>>>>> > java.lang.StackOverflowError
> >> >> >>>>>>> >     at
> >> >> >>>>>>> >
> java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:318)
> >> >> >>>>>>> >     at
> >> >> >>>>>>> >
> >> >> >>>>>>> >
> >> >> >>>>>>> >
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1133)
> >> >> >>>>>>> >     at
> >> >> >>>>>>> >
> >> >> >>>>>>> >
> >> >> >>>>>>> >
> >> >> >>>>>>> >
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
> >> >> >>>>>>> >     at
> >> >> >>>>>>> >
> >> >> >>>>>>> >
> >> >> >>>>>>> >
> >> >> >>>>>>> >
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
> >> >> >>>>>>> >     at
> >> >> >>>>>>> >
> >> >> >>>>>>> >
> >> >> >>>>>>> >
> >> >> >>>>>>> >
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
> >> >> >>>>>>> >     at
> >> >> >>>>>>> >
> >> >> >>>>>>> >
> >> >> >>>>>>> >
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
> >> >> >>>>>>> >     at
> >> >> >>>>>>> >
> >> >> >>>>>>> >
> >> >> >>>>>>> >
> >> >> >>>>>>> >
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
> >> >> >>>>>>> >     at
> >> >> >>>>>>> >
> >> >> >>>>>>> >
> >> >> >>>>>>> >
> >> >> >>>>>>> >
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
> >> >> >>>>>>> >     at
> >> >> >>>>>>> >
> >> >> >>>>>>> >
> >> >> >>>>>>> >
> >> >> >>>>>>> >
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
> >> >> >>>>>>> > ...
> >> >> >>>>>>> > ...
> >> >> >>>>>>> >
> >> >> >>>>>>> > More experiments  .. this works -
> >> >> >>>>>>> >
> >> >> >>>>>>> > val lst =
> Range(0,10000).map(i=>("10","10",i:Double)).toList
> >> >> >>>>>>> > sc.makeRDD(lst).map(i=> if(a==10) 1 else 0)
> >> >> >>>>>>> >
> >> >> >>>>>>> > But below doesn't and throws the StackoverflowError -
> >> >> >>>>>>> >
> >> >> >>>>>>> > val lst = MutableList[(String,String,Double)]()
> >> >> >>>>>>> > Range(0,10000).foreach(i=>lst+=(("10","10",i:Double)))
> >> >> >>>>>>> > sc.makeRDD(lst).map(i=> if(a==10) 1 else 0)
> >> >> >>>>>>> >
> >> >> >>>>>>> > Any help appreciated!
> >> >> >>>>>>> >
> >> >> >>>>>>> > Thanks,
> >> >> >>>>>>> > Ashish
> >> >> >>>>>>> >
> >> >> >>>>>>> >
> >> >> >>>>>>> >
> >> >> >>>>>>> > --
> >> >> >>>>>>> > View this message in context:
> >> >> >>>>>>> >
> >> >> >>>>>>> >
> >> >> >>>>>>> >
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-shell-and-StackOverFlowError-tp24508.html
> >> >> >>>>>>> > Sent from the Apache Spark User List mailing list archive
> at
> >> >> >>>>>>> > Nabble.com.
> >> >> >>>>>>> >
> >> >> >>>>>>> >
> >> >> >>>>>>> >
> >> >> >>>>>>> >
> >> >> >>>>>>> >
> ---------------------------------------------------------------------
> >> >> >>>>>>> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> >> >> >>>>>>> > For additional commands, e-mail:
> user-h...@spark.apache.org
> >> >> >>>>>>> >
> >> >> >>>>>>>
> >> >> >>>>>>>
> >> >> >>>>>>>
> >> >> >>>>>>>
> ---------------------------------------------------------------------
> >> >> >>>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> >> >> >>>>>>> For additional commands, e-mail: user-h...@spark.apache.org
> >> >> >>>>>>>
> >> >> >>>>>>
> >> >> >>>>
> >> >> >>
> >> >> >
>

Reply via email to