Aviem Zur created BEAM-2106: ------------------------------- Summary: NotSerializableException thrown when serializing EvaluationContext Key: BEAM-2106 URL: https://issues.apache.org/jira/browse/BEAM-2106 Project: Beam Issue Type: Bug Components: runner-spark Reporter: Aviem Zur Assignee: Aviem Zur Fix For: First stable release
When an {{EvaluationContext}} is serialized as part of checkpointing a {{NotSerializableException}} is thrown. Test to reproduce this can be found here: https://gist.github.com/aviemzur/877d128653861ffe1d4107759aebe5b0 Stack trace: {code} Caused by: java.io.NotSerializableException: DStream checkpointing has been enabled but the DStreams with their functions are not serializable org.apache.beam.runners.spark.translation.EvaluationContext Serialization stack: - object not serializable (class: org.apache.beam.runners.spark.translation.EvaluationContext, value: org.apache.beam.runners.spark.translation.EvaluationContext@40d2616c) - field (class: org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$9$1, name: val$context, type: class org.apache.beam.runners.spark.translation.EvaluationContext) - object (class org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$9$1, org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$9$1@317dadcb) - field (class: org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transformToPair$1, name: transformFunc$3, type: interface org.apache.spark.api.java.function.Function) - object (class org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transformToPair$1, <function1>) - field (class: org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21, name: cleanedF$2, type: interface scala.Function1) - object (class org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21, <function2>) - field (class: org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5, name: cleanedF$3, type: interface scala.Function2) - object (class org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5, <function2>) - field (class: org.apache.spark.streaming.dstream.TransformedDStream, name: transformFunc, type: interface scala.Function2) - object (class org.apache.spark.streaming.dstream.TransformedDStream, org.apache.spark.streaming.dstream.TransformedDStream@31b0347a) - writeObject data (class: org.apache.spark.streaming.dstream.DStreamCheckpointData) - object (class org.apache.spark.streaming.dstream.DStreamCheckpointData, [ 0 checkpoint files ]) - writeObject data (class: org.apache.spark.streaming.dstream.DStream) - object (class org.apache.spark.streaming.dstream.InternalMapWithStateDStream, org.apache.spark.streaming.dstream.InternalMapWithStateDStream@56dcb4cd) - writeObject data (class: org.apache.spark.streaming.dstream.DStreamCheckpointData) - object (class org.apache.spark.streaming.dstream.DStreamCheckpointData, [ 0 checkpoint files ]) - writeObject data (class: org.apache.spark.streaming.dstream.DStream) - object (class org.apache.spark.streaming.dstream.FilteredDStream, org.apache.spark.streaming.dstream.FilteredDStream@7bd7c8de) - writeObject data (class: org.apache.spark.streaming.dstream.DStreamCheckpointData) - object (class org.apache.spark.streaming.dstream.DStreamCheckpointData, [ 0 checkpoint files ]) - writeObject data (class: org.apache.spark.streaming.dstream.DStream) - object (class org.apache.spark.streaming.dstream.MapWithStateDStreamImpl, org.apache.spark.streaming.dstream.MapWithStateDStreamImpl@768bdfe1) - writeObject data (class: org.apache.spark.streaming.dstream.DStreamCheckpointData) - object (class org.apache.spark.streaming.dstream.DStreamCheckpointData, [ 0 checkpoint files ]) - writeObject data (class: org.apache.spark.streaming.dstream.DStream) - object (class org.apache.spark.streaming.dstream.MappedDStream, org.apache.spark.streaming.dstream.MappedDStream@24ec6d2c) - field (class: org.apache.spark.streaming.api.java.JavaDStream, name: dstream, type: class org.apache.spark.streaming.dstream.DStream) - object (class org.apache.spark.streaming.api.java.JavaDStream, org.apache.spark.streaming.api.java.JavaDStream@46e31bad) - field (class: org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$3, name: $outer, type: interface org.apache.spark.streaming.api.java.JavaDStreamLike) - object (class org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$3, <function1>) - field (class: org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3, name: cleanedF$1, type: interface scala.Function1) - object (class org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3, <function2>) - writeObject data (class: org.apache.spark.streaming.dstream.DStreamCheckpointData) - object (class org.apache.spark.streaming.dstream.DStreamCheckpointData, [ 0 checkpoint files ]) - writeObject data (class: org.apache.spark.streaming.dstream.DStream) - object (class org.apache.spark.streaming.dstream.MappedDStream, org.apache.spark.streaming.dstream.MappedDStream@6c292dc2) - writeObject data (class: org.apache.spark.streaming.dstream.DStreamCheckpointData) - object (class org.apache.spark.streaming.dstream.DStreamCheckpointData, [ 0 checkpoint files ]) - writeObject data (class: org.apache.spark.streaming.dstream.DStream) - object (class org.apache.spark.streaming.dstream.ForEachDStream, org.apache.spark.streaming.dstream.ForEachDStream@1c553a89) - writeObject data (class: org.apache.spark.streaming.dstream.DStreamCheckpointData) - object (class org.apache.spark.streaming.dstream.DStreamCheckpointData, [ 0 checkpoint files ]) - writeObject data (class: org.apache.spark.streaming.dstream.DStream) - object (class org.apache.beam.runners.spark.io.SparkUnboundedSource$ReportingDStream, org.apache.beam.runners.spark.io.SparkUnboundedSource$ReportingDStream@bd05853) - element of array (index: 0) - array (class [Ljava.lang.Object;, size 16) - field (class: scala.collection.mutable.ArrayBuffer, name: array, type: class [Ljava.lang.Object;) - object (class scala.collection.mutable.ArrayBuffer, ArrayBuffer(org.apache.beam.runners.spark.io.SparkUnboundedSource$ReportingDStream@bd05853, org.apache.spark.streaming.dstream.ForEachDStream@1c553a89, org.apache.spark.streaming.dstream.ForEachDStream@b8d9fa6)) - writeObject data (class: org.apache.spark.streaming.dstream.DStreamCheckpointData) - object (class org.apache.spark.streaming.dstream.DStreamCheckpointData, [ 0 checkpoint files ]) - writeObject data (class: org.apache.spark.streaming.dstream.DStream) - object (class org.apache.beam.runners.spark.io.SourceDStream, org.apache.beam.runners.spark.io.SourceDStream@74797f2b) - element of array (index: 0) - array (class [Ljava.lang.Object;, size 16) - field (class: scala.collection.mutable.ArrayBuffer, name: array, type: class [Ljava.lang.Object;) - object (class scala.collection.mutable.ArrayBuffer, ArrayBuffer(org.apache.beam.runners.spark.io.SourceDStream@74797f2b)) - writeObject data (class: org.apache.spark.streaming.DStreamGraph) - object (class org.apache.spark.streaming.DStreamGraph, org.apache.spark.streaming.DStreamGraph@42aa9ffa) - field (class: org.apache.spark.streaming.Checkpoint, name: graph, type: class org.apache.spark.streaming.DStreamGraph) - object (class org.apache.spark.streaming.Checkpoint, org.apache.spark.streaming.Checkpoint@78ab0194) at org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:557) at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:601) at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:600) at org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:624) at org.apache.beam.runners.spark.SparkRunner$1.run(SparkRunner.java:212) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) at java.util.concurrent.FutureTask.run(FutureTask.java:262) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)