Vaguely makes sense. :) Wow that's an interesting corner case. On Wed, Apr 22, 2015 at 1:57 PM, Jean-Pascal Billaud <j...@tellapart.com> wrote:
> I have now a fair understanding of the situation after looking at javap > output. So as a reminder: > > dstream.map(tuple => { > val w = StreamState.fetch[K,W](state.prefixKey, tuple._1) > (tuple._1, (tuple._2, w)) }) > > And StreamState being a very simple standalone object: > > object StreamState { > def fetch[K : ClassTag : Ordering, V : ClassTag](prefixKey: String, key: > K) : Option[V] = None > } > > Basically the serialization failed because the ClassTag[K] came from the > enclosing class, in which the dstream.map() code is running e.g. : > > class A[K : ClassTag](val dstream: DStream[K]) { > [...] > def fun = > dstream.map(tuple => { > val w = StreamState.fetch[K,W](state.prefixKey, tuple._1) > (tuple._1, (tuple._2, w)) }) > } > > therefore the instance of class A is being serialized and it fails when > the dstream field call writeObject() when it checks for the graph field... > > The fact that graph is not set might be expected given that I have not > started the context yet... > > Cheers, > > > On Tue, Apr 21, 2015 at 6:17 PM, Tathagata Das <t...@databricks.com> > wrote: > >> It is kind of unexpected, i can imagine a real scenario under which it >> should trigger. But obviously I am missing something :) >> >> TD >> >> On Tue, Apr 21, 2015 at 4:23 PM, Jean-Pascal Billaud <j...@tellapart.com> >> wrote: >> >>> Sure. But in general, I am assuming this ""Graph is unexpectedly null >>> when DStream is being serialized" must mean something. Under which >>> circumstances, such an exception would trigger? >>> >>> On Tue, Apr 21, 2015 at 4:12 PM, Tathagata Das <t...@databricks.com> >>> wrote: >>> >>>> Yeah, I am not sure what is going on. The only way to figure to take a >>>> look at the disassembled bytecodes using javap. >>>> >>>> TD >>>> >>>> On Tue, Apr 21, 2015 at 1:53 PM, Jean-Pascal Billaud <j...@tellapart.com> >>>> wrote: >>>> >>>>> At this point I am assuming that nobody has an idea... I am still >>>>> going to give it a last shot just in case it was missed by some people :) >>>>> >>>>> Thanks, >>>>> >>>>> On Mon, Apr 20, 2015 at 2:20 PM, Jean-Pascal Billaud <j...@tellapart.com >>>>> > wrote: >>>>> >>>>>> Hey, so I start the context at the very end when all the piping is >>>>>> done. BTW a foreachRDD will be called on the resulting dstream.map() >>>>>> right >>>>>> after that. >>>>>> >>>>>> The puzzling thing is why removing the context bounds solve the >>>>>> problem... What does this exception mean in general? >>>>>> >>>>>> On Mon, Apr 20, 2015 at 1:33 PM, Tathagata Das <t...@databricks.com> >>>>>> wrote: >>>>>> >>>>>>> When are you getting this exception? After starting the context? >>>>>>> >>>>>>> TD >>>>>>> >>>>>>> On Mon, Apr 20, 2015 at 10:44 AM, Jean-Pascal Billaud < >>>>>>> j...@tellapart.com> wrote: >>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> I am getting this serialization exception and I am not too sure >>>>>>>> what "Graph is unexpectedly null when DStream is being serialized" >>>>>>>> means? >>>>>>>> >>>>>>>> 15/04/20 06:12:38 INFO yarn.ApplicationMaster: Final app status: >>>>>>>> FAILED, exitCode: 15, (reason: User class threw exception: Task not >>>>>>>> serializable) >>>>>>>> Exception in thread "Driver" org.apache.spark.SparkException: Task >>>>>>>> not serializable >>>>>>>> at org.apache.spark.util.ClosureCleaner$. >>>>>>>> ensureSerializable(ClosureCleaner.scala:166) >>>>>>>> at org.apache.spark.util.ClosureCleaner$.clean( >>>>>>>> ClosureCleaner.scala:158) >>>>>>>> at org.apache.spark.SparkContext. >>>>>>>> clean(SparkContext.scala:1435) >>>>>>>> at org.apache.spark.streaming.dstream.DStream.map(DStream. >>>>>>>> scala:438) >>>>>>>> [...] >>>>>>>> Caused by: java.io.NotSerializableException: Graph is unexpectedly >>>>>>>> null when DStream is being serialized. >>>>>>>> at org.apache.spark.streaming.dstream.DStream$anonfun$ >>>>>>>> writeObject$1.apply$mcV$sp(DStream.scala:420) >>>>>>>> at org.apache.spark.util.Utils$. >>>>>>>> tryOrIOException(Utils.scala:985) >>>>>>>> at org.apache.spark.streaming.dstream.DStream.writeObject( >>>>>>>> DStream.scala:403) >>>>>>>> >>>>>>>> The operation comes down to something like this: >>>>>>>> >>>>>>>> dstream.map(tuple => { >>>>>>>> val w = StreamState.fetch[K,W](state.prefixKey, tuple._1) >>>>>>>> (tuple._1, (tuple._2, w)) }) >>>>>>>> >>>>>>>> And StreamState being a very simple standalone object: >>>>>>>> >>>>>>>> object StreamState { >>>>>>>> def fetch[K : ClassTag : Ordering, V : ClassTag](prefixKey: >>>>>>>> String, key: K) : Option[V] = None >>>>>>>> } >>>>>>>> >>>>>>>> However if I remove the context bounds from K in fetch e.g. >>>>>>>> removing ClassTag and Ordering then everything is fine. >>>>>>>> >>>>>>>> If anyone has some pointers, I'd really appreciate it. >>>>>>>> >>>>>>>> Thanks, >>>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >