Aviem Zur created BEAM-2106:
-------------------------------

             Summary: NotSerializableException thrown when serializing 
EvaluationContext
                 Key: BEAM-2106
                 URL: https://issues.apache.org/jira/browse/BEAM-2106
             Project: Beam
          Issue Type: Bug
          Components: runner-spark
            Reporter: Aviem Zur
            Assignee: Aviem Zur
             Fix For: First stable release


When an {{EvaluationContext}} is serialized as part of checkpointing a 
{{NotSerializableException}} is thrown.

Test to reproduce this can be found here:
https://gist.github.com/aviemzur/877d128653861ffe1d4107759aebe5b0

Stack trace:
{code}
Caused by: java.io.NotSerializableException: DStream checkpointing has been 
enabled but the DStreams with their functions are not serializable
org.apache.beam.runners.spark.translation.EvaluationContext
Serialization stack:
        - object not serializable (class: 
org.apache.beam.runners.spark.translation.EvaluationContext, value: 
org.apache.beam.runners.spark.translation.EvaluationContext@40d2616c)
        - field (class: 
org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$9$1,
 name: val$context, type: class 
org.apache.beam.runners.spark.translation.EvaluationContext)
        - object (class 
org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$9$1,
 
org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$9$1@317dadcb)
        - field (class: 
org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transformToPair$1, 
name: transformFunc$3, type: interface 
org.apache.spark.api.java.function.Function)
        - object (class 
org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transformToPair$1, 
<function1>)
        - field (class: 
org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21,
 name: cleanedF$2, type: interface scala.Function1)
        - object (class 
org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21,
 <function2>)
        - field (class: 
org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5, 
name: cleanedF$3, type: interface scala.Function2)
        - object (class 
org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5, 
<function2>)
        - field (class: org.apache.spark.streaming.dstream.TransformedDStream, 
name: transformFunc, type: interface scala.Function2)
        - object (class org.apache.spark.streaming.dstream.TransformedDStream, 
org.apache.spark.streaming.dstream.TransformedDStream@31b0347a)
        - writeObject data (class: 
org.apache.spark.streaming.dstream.DStreamCheckpointData)
        - object (class 
org.apache.spark.streaming.dstream.DStreamCheckpointData, [
0 checkpoint files 

])
        - writeObject data (class: org.apache.spark.streaming.dstream.DStream)
        - object (class 
org.apache.spark.streaming.dstream.InternalMapWithStateDStream, 
org.apache.spark.streaming.dstream.InternalMapWithStateDStream@56dcb4cd)
        - writeObject data (class: 
org.apache.spark.streaming.dstream.DStreamCheckpointData)
        - object (class 
org.apache.spark.streaming.dstream.DStreamCheckpointData, [
0 checkpoint files 

])
        - writeObject data (class: org.apache.spark.streaming.dstream.DStream)
        - object (class org.apache.spark.streaming.dstream.FilteredDStream, 
org.apache.spark.streaming.dstream.FilteredDStream@7bd7c8de)
        - writeObject data (class: 
org.apache.spark.streaming.dstream.DStreamCheckpointData)
        - object (class 
org.apache.spark.streaming.dstream.DStreamCheckpointData, [
0 checkpoint files 

])
        - writeObject data (class: org.apache.spark.streaming.dstream.DStream)
        - object (class 
org.apache.spark.streaming.dstream.MapWithStateDStreamImpl, 
org.apache.spark.streaming.dstream.MapWithStateDStreamImpl@768bdfe1)
        - writeObject data (class: 
org.apache.spark.streaming.dstream.DStreamCheckpointData)
        - object (class 
org.apache.spark.streaming.dstream.DStreamCheckpointData, [
0 checkpoint files 

])
        - writeObject data (class: org.apache.spark.streaming.dstream.DStream)
        - object (class org.apache.spark.streaming.dstream.MappedDStream, 
org.apache.spark.streaming.dstream.MappedDStream@24ec6d2c)
        - field (class: org.apache.spark.streaming.api.java.JavaDStream, name: 
dstream, type: class org.apache.spark.streaming.dstream.DStream)
        - object (class org.apache.spark.streaming.api.java.JavaDStream, 
org.apache.spark.streaming.api.java.JavaDStream@46e31bad)
        - field (class: 
org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$3, 
name: $outer, type: interface 
org.apache.spark.streaming.api.java.JavaDStreamLike)
        - object (class 
org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$3, 
<function1>)
        - field (class: 
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3,
 name: cleanedF$1, type: interface scala.Function1)
        - object (class 
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3,
 <function2>)
        - writeObject data (class: 
org.apache.spark.streaming.dstream.DStreamCheckpointData)
        - object (class 
org.apache.spark.streaming.dstream.DStreamCheckpointData, [
0 checkpoint files 

])
        - writeObject data (class: org.apache.spark.streaming.dstream.DStream)
        - object (class org.apache.spark.streaming.dstream.MappedDStream, 
org.apache.spark.streaming.dstream.MappedDStream@6c292dc2)
        - writeObject data (class: 
org.apache.spark.streaming.dstream.DStreamCheckpointData)
        - object (class 
org.apache.spark.streaming.dstream.DStreamCheckpointData, [
0 checkpoint files 

])
        - writeObject data (class: org.apache.spark.streaming.dstream.DStream)
        - object (class org.apache.spark.streaming.dstream.ForEachDStream, 
org.apache.spark.streaming.dstream.ForEachDStream@1c553a89)
        - writeObject data (class: 
org.apache.spark.streaming.dstream.DStreamCheckpointData)
        - object (class 
org.apache.spark.streaming.dstream.DStreamCheckpointData, [
0 checkpoint files 

])
        - writeObject data (class: org.apache.spark.streaming.dstream.DStream)
        - object (class 
org.apache.beam.runners.spark.io.SparkUnboundedSource$ReportingDStream, 
org.apache.beam.runners.spark.io.SparkUnboundedSource$ReportingDStream@bd05853)
        - element of array (index: 0)
        - array (class [Ljava.lang.Object;, size 16)
        - field (class: scala.collection.mutable.ArrayBuffer, name: array, 
type: class [Ljava.lang.Object;)
        - object (class scala.collection.mutable.ArrayBuffer, 
ArrayBuffer(org.apache.beam.runners.spark.io.SparkUnboundedSource$ReportingDStream@bd05853,
 org.apache.spark.streaming.dstream.ForEachDStream@1c553a89, 
org.apache.spark.streaming.dstream.ForEachDStream@b8d9fa6))
        - writeObject data (class: 
org.apache.spark.streaming.dstream.DStreamCheckpointData)
        - object (class 
org.apache.spark.streaming.dstream.DStreamCheckpointData, [
0 checkpoint files 

])
        - writeObject data (class: org.apache.spark.streaming.dstream.DStream)
        - object (class org.apache.beam.runners.spark.io.SourceDStream, 
org.apache.beam.runners.spark.io.SourceDStream@74797f2b)
        - element of array (index: 0)
        - array (class [Ljava.lang.Object;, size 16)
        - field (class: scala.collection.mutable.ArrayBuffer, name: array, 
type: class [Ljava.lang.Object;)
        - object (class scala.collection.mutable.ArrayBuffer, 
ArrayBuffer(org.apache.beam.runners.spark.io.SourceDStream@74797f2b))
        - writeObject data (class: org.apache.spark.streaming.DStreamGraph)
        - object (class org.apache.spark.streaming.DStreamGraph, 
org.apache.spark.streaming.DStreamGraph@42aa9ffa)
        - field (class: org.apache.spark.streaming.Checkpoint, name: graph, 
type: class org.apache.spark.streaming.DStreamGraph)
        - object (class org.apache.spark.streaming.Checkpoint, 
org.apache.spark.streaming.Checkpoint@78ab0194)
        at 
org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:557)
        at 
org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:601)
        at 
org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:600)
        at 
org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:624)
        at org.apache.beam.runners.spark.SparkRunner$1.run(SparkRunner.java:212)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
        at java.util.concurrent.FutureTask.run(FutureTask.java:262)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
{code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to