Ashish:
Can you post the complete stack trace for NotSerializableException ?

Cheers

On Mon, Aug 31, 2015 at 8:49 AM, Ashish Shrowty <ashish.shro...@gmail.com>
wrote:

> bcItemsIdx is just a broadcast variable constructed out of Array[(String)]
> .. it holds the item ids and I use it for indexing the MatrixEntry objects
>
>
> On Mon, Aug 31, 2015 at 10:41 AM Sean Owen <so...@cloudera.com> wrote:
>
>> It's not clear; that error is different still and somehow suggests
>> you're serializing a stream somewhere. I'd look at what's inside
>> bcItemsIdx as that is not shown here.
>>
>> On Mon, Aug 31, 2015 at 3:34 PM, Ashish Shrowty
>>
>> <ashish.shro...@gmail.com> wrote:
>> > Sean,
>> >
>> > Thanks for your comments. What I was really trying to do was to
>> transform a
>> > RDD[(userid,itemid,ratings)] into a RowMatrix so that I can do some
>> column
>> > similarity calculations while exploring the data before building some
>> > models. But to do that I need to first convert the user and item ids
>> into
>> > respective indexes where I intended on passing in an array into the
>> closure,
>> > which is where I got stuck with this overflowerror trying to figure out
>> > where it is happening. The actual error I got was slightly different
>> (Caused
>> > by: java.io.NotSerializableException: java.io.ObjectInputStream). I
>> started
>> > investigating this issue which led me to the earlier code snippet that
>> I had
>> > posted. This is again because of the bcItemsIdx variable being passed
>> into
>> > the closure. Below code works if I don't pass in the variable and use
>> simply
>> > a constant like 10 in its place .. The code thus far -
>> >
>> > // rdd below is RDD[(String,String,Double)]
>> > // bcItemsIdx below is Broadcast[Array[String]] which is an array of
>> item
>> > ids
>> > val gRdd = rdd.map{case(user,item,rating) =>
>> > ((user),(item,rating))}.groupByKey
>> > val idxRdd = gRdd.zipWithIndex
>> > val cm = new CoordinateMatrix(
>> >     idxRdd.flatMap[MatrixEntry](e => {
>> >         e._1._2.map(item=> {
>> >                  MatrixEntry(e._2, bcItemsIdx.value.indexOf(item._1),
>> > item._2) // <- This is where I get the Serialization error passing in
>> the
>> > index
>> >                  // MatrixEntry(e._2, 10, item._2) // <- This works
>> >         })
>> >     })
>> > )
>> > val rm = cm.toRowMatrix
>> > val simMatrix = rm.columnSimilarities()
>> >
>> > I would like to make this work in the Spark shell as I am still
>> exploring
>> > the data. Let me know if there is an alternate way of constructing the
>> > RowMatrix.
>> >
>> > Thanks and appreciate all the help!
>> >
>> > Ashish
>> >
>> > On Mon, Aug 31, 2015 at 3:41 AM Sean Owen <so...@cloudera.com> wrote:
>> >>
>> >> Yeah I see that now. I think it fails immediately because the map
>> >> operation does try to clean and/or verify the serialization of the
>> >> closure upfront.
>> >>
>> >> I'm not quite sure what is going on, but I think it's some strange
>> >> interaction between how you're building up the list and what the
>> >> resulting representation happens to be like, and how the closure
>> >> cleaner works, which can't be perfect. The shell also introduces an
>> >> extra layer of issues.
>> >>
>> >> For example, the slightly more canonical approaches work fine:
>> >>
>> >> import scala.collection.mutable.MutableList
>> >> val lst = MutableList[(String,String,Double)]()
>> >> (0 to 10000).foreach(i => lst :+ ("10", "10", i.toDouble))
>> >>
>> >> or just
>> >>
>> >> val lst = (0 to 10000).map(i => ("10", "10", i.toDouble))
>> >>
>> >> If you just need this to work, maybe those are better alternatives
>> anyway.
>> >> You can also check whether it works without the shell, as I suspect
>> >> that's a factor.
>> >>
>> >> It's not an error in Spark per se but saying that something's default
>> >> Java serialization graph is very deep, so it's like the code you wrote
>> >> plus the closure cleaner ends up pulling in some huge linked list and
>> >> serializing it the direct and unuseful way.
>> >>
>> >> If you have an idea about exactly why it's happening you can open a
>> >> JIRA, but arguably it's something that's nice to just work but isn't
>> >> to do with Spark per se. Or, have a look at others related to the
>> >> closure and shell and you may find this is related to other known
>> >> behavior.
>> >>
>> >>
>> >> On Sun, Aug 30, 2015 at 8:08 PM, Ashish Shrowty
>> >> <ashish.shro...@gmail.com> wrote:
>> >> > Sean .. does the code below work for you in the Spark shell? Ted got
>> the
>> >> > same error -
>> >> >
>> >> > val a=10
>> >> > val lst = MutableList[(String,String,Double)]()
>> >> > Range(0,10000).foreach(i=>lst+=(("10","10",i:Double)))
>> >> > sc.makeRDD(lst).map(i=> if(a==10) 1 else 0)
>> >> >
>> >> > -Ashish
>> >> >
>> >> >
>> >> > On Sun, Aug 30, 2015 at 2:52 PM Sean Owen <so...@cloudera.com>
>> wrote:
>> >> >>
>> >> >> I'm not sure how to reproduce it? this code does not produce an
>> error
>> >> >> in
>> >> >> master.
>> >> >>
>> >> >> On Sun, Aug 30, 2015 at 7:26 PM, Ashish Shrowty
>> >> >> <ashish.shro...@gmail.com> wrote:
>> >> >> > Do you think I should create a JIRA?
>> >> >> >
>> >> >> >
>> >> >> > On Sun, Aug 30, 2015 at 12:56 PM Ted Yu <yuzhih...@gmail.com>
>> wrote:
>> >> >> >>
>> >> >> >> I got StackOverFlowError as well :-(
>> >> >> >>
>> >> >> >> On Sun, Aug 30, 2015 at 9:47 AM, Ashish Shrowty
>> >> >> >> <ashish.shro...@gmail.com>
>> >> >> >> wrote:
>> >> >> >>>
>> >> >> >>> Yep .. I tried that too earlier. Doesn't make a difference. Are
>> you
>> >> >> >>> able
>> >> >> >>> to replicate on your side?
>> >> >> >>>
>> >> >> >>>
>> >> >> >>> On Sun, Aug 30, 2015 at 12:08 PM Ted Yu <yuzhih...@gmail.com>
>> >> >> >>> wrote:
>> >> >> >>>>
>> >> >> >>>> I see.
>> >> >> >>>>
>> >> >> >>>> What about using the following in place of variable a ?
>> >> >> >>>>
>> >> >> >>>>
>> >> >> >>>>
>> >> >> >>>>
>> http://spark.apache.org/docs/latest/programming-guide.html#broadcast-variables
>> >> >> >>>>
>> >> >> >>>> Cheers
>> >> >> >>>>
>> >> >> >>>> On Sun, Aug 30, 2015 at 8:54 AM, Ashish Shrowty
>> >> >> >>>> <ashish.shro...@gmail.com> wrote:
>> >> >> >>>>>
>> >> >> >>>>> @Sean - Agree that there is no action, but I still get the
>> >> >> >>>>> stackoverflowerror, its very weird
>> >> >> >>>>>
>> >> >> >>>>> @Ted - Variable a is just an int - val a = 10 ... The error
>> >> >> >>>>> happens
>> >> >> >>>>> when I try to pass a variable into the closure. The example
>> you
>> >> >> >>>>> have
>> >> >> >>>>> above
>> >> >> >>>>> works fine since there is no variable being passed into the
>> >> >> >>>>> closure
>> >> >> >>>>> from the
>> >> >> >>>>> shell.
>> >> >> >>>>>
>> >> >> >>>>> -Ashish
>> >> >> >>>>>
>> >> >> >>>>> On Sun, Aug 30, 2015 at 9:55 AM Ted Yu <yuzhih...@gmail.com>
>> >> >> >>>>> wrote:
>> >> >> >>>>>>
>> >> >> >>>>>> Using Spark shell :
>> >> >> >>>>>>
>> >> >> >>>>>> scala> import scala.collection.mutable.MutableList
>> >> >> >>>>>> import scala.collection.mutable.MutableList
>> >> >> >>>>>>
>> >> >> >>>>>> scala> val lst = MutableList[(String,String,Double)]()
>> >> >> >>>>>> lst: scala.collection.mutable.MutableList[(String, String,
>> >> >> >>>>>> Double)]
>> >> >> >>>>>> =
>> >> >> >>>>>> MutableList()
>> >> >> >>>>>>
>> >> >> >>>>>> scala> Range(0,10000).foreach(i=>lst+=(("10","10",i:Double)))
>> >> >> >>>>>>
>> >> >> >>>>>> scala> val rdd=sc.makeRDD(lst).map(i=> if(a==10) 1 else 0)
>> >> >> >>>>>> <console>:27: error: not found: value a
>> >> >> >>>>>>        val rdd=sc.makeRDD(lst).map(i=> if(a==10) 1 else 0)
>> >> >> >>>>>>                                           ^
>> >> >> >>>>>>
>> >> >> >>>>>> scala> val rdd=sc.makeRDD(lst).map(i=> if(i._1==10) 1 else 0)
>> >> >> >>>>>> rdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at
>> map
>> >> >> >>>>>> at
>> >> >> >>>>>> <console>:27
>> >> >> >>>>>>
>> >> >> >>>>>> scala> rdd.count()
>> >> >> >>>>>> ...
>> >> >> >>>>>> 15/08/30 06:53:40 INFO DAGScheduler: Job 0 finished: count at
>> >> >> >>>>>> <console>:30, took 0.478350 s
>> >> >> >>>>>> res1: Long = 10000
>> >> >> >>>>>>
>> >> >> >>>>>> Ashish:
>> >> >> >>>>>> Please refine your example to mimic more closely what your
>> code
>> >> >> >>>>>> actually did.
>> >> >> >>>>>>
>> >> >> >>>>>> Thanks
>> >> >> >>>>>>
>> >> >> >>>>>> On Sun, Aug 30, 2015 at 12:24 AM, Sean Owen <
>> so...@cloudera.com>
>> >> >> >>>>>> wrote:
>> >> >> >>>>>>>
>> >> >> >>>>>>> That can't cause any error, since there is no action in your
>> >> >> >>>>>>> first
>> >> >> >>>>>>> snippet. Even calling count on the result doesn't cause an
>> >> >> >>>>>>> error.
>> >> >> >>>>>>> You
>> >> >> >>>>>>> must be executing something different.
>> >> >> >>>>>>>
>> >> >> >>>>>>> On Sun, Aug 30, 2015 at 4:21 AM, ashrowty
>> >> >> >>>>>>> <ashish.shro...@gmail.com>
>> >> >> >>>>>>> wrote:
>> >> >> >>>>>>> > I am running the Spark shell (1.2.1) in local mode and I
>> have
>> >> >> >>>>>>> > a
>> >> >> >>>>>>> > simple
>> >> >> >>>>>>> > RDD[(String,String,Double)] with about 10,000 objects in
>> it.
>> >> >> >>>>>>> > I
>> >> >> >>>>>>> > get
>> >> >> >>>>>>> > a
>> >> >> >>>>>>> > StackOverFlowError each time I try to run the following
>> code
>> >> >> >>>>>>> > (the
>> >> >> >>>>>>> > code
>> >> >> >>>>>>> > itself is just representative of other logic where I need
>> to
>> >> >> >>>>>>> > pass
>> >> >> >>>>>>> > in a
>> >> >> >>>>>>> > variable). I tried broadcasting the variable too, but no
>> luck
>> >> >> >>>>>>> > ..
>> >> >> >>>>>>> > missing
>> >> >> >>>>>>> > something basic here -
>> >> >> >>>>>>> >
>> >> >> >>>>>>> > val rdd = sc.makeRDD(List(<Data read from file>)
>> >> >> >>>>>>> > val a=10
>> >> >> >>>>>>> > rdd.map(r => if (a==10) 1 else 0)
>> >> >> >>>>>>> > This throws -
>> >> >> >>>>>>> >
>> >> >> >>>>>>> > java.lang.StackOverflowError
>> >> >> >>>>>>> >     at
>> >> >> >>>>>>> >
>> java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:318)
>> >> >> >>>>>>> >     at
>> >> >> >>>>>>> >
>> >> >> >>>>>>> >
>> >> >> >>>>>>> >
>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1133)
>> >> >> >>>>>>> >     at
>> >> >> >>>>>>> >
>> >> >> >>>>>>> >
>> >> >> >>>>>>> >
>> >> >> >>>>>>> >
>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>> >> >> >>>>>>> >     at
>> >> >> >>>>>>> >
>> >> >> >>>>>>> >
>> >> >> >>>>>>> >
>> >> >> >>>>>>> >
>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>> >> >> >>>>>>> >     at
>> >> >> >>>>>>> >
>> >> >> >>>>>>> >
>> >> >> >>>>>>> >
>> >> >> >>>>>>> >
>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>> >> >> >>>>>>> >     at
>> >> >> >>>>>>> >
>> >> >> >>>>>>> >
>> >> >> >>>>>>> >
>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>> >> >> >>>>>>> >     at
>> >> >> >>>>>>> >
>> >> >> >>>>>>> >
>> >> >> >>>>>>> >
>> >> >> >>>>>>> >
>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>> >> >> >>>>>>> >     at
>> >> >> >>>>>>> >
>> >> >> >>>>>>> >
>> >> >> >>>>>>> >
>> >> >> >>>>>>> >
>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>> >> >> >>>>>>> >     at
>> >> >> >>>>>>> >
>> >> >> >>>>>>> >
>> >> >> >>>>>>> >
>> >> >> >>>>>>> >
>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>> >> >> >>>>>>> > ...
>> >> >> >>>>>>> > ...
>> >> >> >>>>>>> >
>> >> >> >>>>>>> > More experiments  .. this works -
>> >> >> >>>>>>> >
>> >> >> >>>>>>> > val lst =
>> Range(0,10000).map(i=>("10","10",i:Double)).toList
>> >> >> >>>>>>> > sc.makeRDD(lst).map(i=> if(a==10) 1 else 0)
>> >> >> >>>>>>> >
>> >> >> >>>>>>> > But below doesn't and throws the StackoverflowError -
>> >> >> >>>>>>> >
>> >> >> >>>>>>> > val lst = MutableList[(String,String,Double)]()
>> >> >> >>>>>>> > Range(0,10000).foreach(i=>lst+=(("10","10",i:Double)))
>> >> >> >>>>>>> > sc.makeRDD(lst).map(i=> if(a==10) 1 else 0)
>> >> >> >>>>>>> >
>> >> >> >>>>>>> > Any help appreciated!
>> >> >> >>>>>>> >
>> >> >> >>>>>>> > Thanks,
>> >> >> >>>>>>> > Ashish
>> >> >> >>>>>>> >
>> >> >> >>>>>>> >
>> >> >> >>>>>>> >
>> >> >> >>>>>>> > --
>> >> >> >>>>>>> > View this message in context:
>> >> >> >>>>>>> >
>> >> >> >>>>>>> >
>> >> >> >>>>>>> >
>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-shell-and-StackOverFlowError-tp24508.html
>> >> >> >>>>>>> > Sent from the Apache Spark User List mailing list archive
>> at
>> >> >> >>>>>>> > Nabble.com.
>> >> >> >>>>>>> >
>> >> >> >>>>>>> >
>> >> >> >>>>>>> >
>> >> >> >>>>>>> >
>> >> >> >>>>>>> >
>> ---------------------------------------------------------------------
>> >> >> >>>>>>> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> >> >> >>>>>>> > For additional commands, e-mail:
>> user-h...@spark.apache.org
>> >> >> >>>>>>> >
>> >> >> >>>>>>>
>> >> >> >>>>>>>
>> >> >> >>>>>>>
>> >> >> >>>>>>>
>> ---------------------------------------------------------------------
>> >> >> >>>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> >> >> >>>>>>> For additional commands, e-mail: user-h...@spark.apache.org
>> >> >> >>>>>>>
>> >> >> >>>>>>
>> >> >> >>>>
>> >> >> >>
>> >> >> >
>>
>

Reply via email to