Vaguely makes sense. :) Wow that's an interesting corner case.

On Wed, Apr 22, 2015 at 1:57 PM, Jean-Pascal Billaud <j...@tellapart.com>
wrote:

> I have now a fair understanding of the situation after looking at javap
> output. So as a reminder:
>
> dstream.map(tuple => {
> val w = StreamState.fetch[K,W](state.prefixKey, tuple._1)
> (tuple._1, (tuple._2, w)) })
>
> And StreamState being a very simple standalone object:
>
> object StreamState {
>   def fetch[K : ClassTag : Ordering, V : ClassTag](prefixKey: String, key:
> K) : Option[V] = None
> }
>
> Basically the serialization failed because the ClassTag[K] came from the
> enclosing class, in which the dstream.map() code is running e.g. :
>
> class A[K : ClassTag](val dstream: DStream[K]) {
>   [...]
>   def fun =
>     dstream.map(tuple => {
>       val w = StreamState.fetch[K,W](state.prefixKey, tuple._1)
>       (tuple._1, (tuple._2, w)) })
>    }
>
> therefore the instance of class A is being serialized and it fails when
> the dstream field call writeObject() when it checks for the graph field...
>
> The fact that graph is not set might be expected given that I have not
> started the context yet...
>
> Cheers,
>
>
> On Tue, Apr 21, 2015 at 6:17 PM, Tathagata Das <t...@databricks.com>
> wrote:
>
>> It is kind of unexpected, i can imagine a real scenario under which it
>> should trigger. But obviously I am missing something :)
>>
>> TD
>>
>> On Tue, Apr 21, 2015 at 4:23 PM, Jean-Pascal Billaud <j...@tellapart.com>
>> wrote:
>>
>>> Sure. But in general, I am assuming this ""Graph is unexpectedly null
>>> when DStream is being serialized" must mean something. Under which
>>> circumstances, such an exception would trigger?
>>>
>>> On Tue, Apr 21, 2015 at 4:12 PM, Tathagata Das <t...@databricks.com>
>>> wrote:
>>>
>>>> Yeah, I am not sure what is going on. The only way to figure to take a
>>>> look at the disassembled bytecodes using javap.
>>>>
>>>> TD
>>>>
>>>> On Tue, Apr 21, 2015 at 1:53 PM, Jean-Pascal Billaud <j...@tellapart.com>
>>>> wrote:
>>>>
>>>>> At this point I am assuming that nobody has an idea... I am still
>>>>> going to give it a last shot just in case it was missed by some people :)
>>>>>
>>>>> Thanks,
>>>>>
>>>>> On Mon, Apr 20, 2015 at 2:20 PM, Jean-Pascal Billaud <j...@tellapart.com
>>>>> > wrote:
>>>>>
>>>>>> Hey, so I start the context at the very end when all the piping is
>>>>>> done. BTW a foreachRDD will be called on the resulting dstream.map() 
>>>>>> right
>>>>>> after that.
>>>>>>
>>>>>> The puzzling thing is why removing the context bounds solve the
>>>>>> problem... What does this exception mean in general?
>>>>>>
>>>>>> On Mon, Apr 20, 2015 at 1:33 PM, Tathagata Das <t...@databricks.com>
>>>>>> wrote:
>>>>>>
>>>>>>> When are you getting this exception? After starting the context?
>>>>>>>
>>>>>>> TD
>>>>>>>
>>>>>>> On Mon, Apr 20, 2015 at 10:44 AM, Jean-Pascal Billaud <
>>>>>>> j...@tellapart.com> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> I am getting this serialization exception and I am not too sure
>>>>>>>> what "Graph is unexpectedly null when DStream is being serialized" 
>>>>>>>> means?
>>>>>>>>
>>>>>>>> 15/04/20 06:12:38 INFO yarn.ApplicationMaster: Final app status:
>>>>>>>> FAILED, exitCode: 15, (reason: User class threw exception: Task not
>>>>>>>> serializable)
>>>>>>>> Exception in thread "Driver" org.apache.spark.SparkException: Task
>>>>>>>> not serializable
>>>>>>>>         at org.apache.spark.util.ClosureCleaner$.
>>>>>>>> ensureSerializable(ClosureCleaner.scala:166)
>>>>>>>>         at org.apache.spark.util.ClosureCleaner$.clean(
>>>>>>>> ClosureCleaner.scala:158)
>>>>>>>>         at org.apache.spark.SparkContext.
>>>>>>>> clean(SparkContext.scala:1435)
>>>>>>>>         at org.apache.spark.streaming.dstream.DStream.map(DStream.
>>>>>>>> scala:438)
>>>>>>>>         [...]
>>>>>>>> Caused by: java.io.NotSerializableException: Graph is unexpectedly
>>>>>>>> null when DStream is being serialized.
>>>>>>>>         at org.apache.spark.streaming.dstream.DStream$anonfun$
>>>>>>>> writeObject$1.apply$mcV$sp(DStream.scala:420)
>>>>>>>>         at org.apache.spark.util.Utils$.
>>>>>>>> tryOrIOException(Utils.scala:985)
>>>>>>>>         at org.apache.spark.streaming.dstream.DStream.writeObject(
>>>>>>>> DStream.scala:403)
>>>>>>>>
>>>>>>>> The operation comes down to something like this:
>>>>>>>>
>>>>>>>> dstream.map(tuple => {
>>>>>>>> val w = StreamState.fetch[K,W](state.prefixKey, tuple._1)
>>>>>>>> (tuple._1, (tuple._2, w)) })
>>>>>>>>
>>>>>>>> And StreamState being a very simple standalone object:
>>>>>>>>
>>>>>>>> object StreamState {
>>>>>>>>   def fetch[K : ClassTag : Ordering, V : ClassTag](prefixKey:
>>>>>>>> String, key: K) : Option[V] = None
>>>>>>>> }
>>>>>>>>
>>>>>>>> However if I remove the context bounds from K in fetch e.g.
>>>>>>>> removing ClassTag and Ordering then everything is fine.
>>>>>>>>
>>>>>>>> If anyone has some pointers, I'd really appreciate it.
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to