[ https://issues.apache.org/jira/browse/BEAM-1737?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15954833#comment-15954833 ]
Amit Sela commented on BEAM-1737: --------------------------------- This is just because you are not allowed to use not Serializables such as {{EvaluationContext}} inside {{DStream}} lambdas. This line is simply not necessary and can be removed: https://github.com/apache/beam/blob/master/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java#L436 The step name is set before calling any {{DStream}} either for single/multi output. I'll have a fix ready. > Interpreting a Single-output ParDo as a Multi-output ParDo with a single > output causes serialization failures > ------------------------------------------------------------------------------------------------------------- > > Key: BEAM-1737 > URL: https://issues.apache.org/jira/browse/BEAM-1737 > Project: Beam > Issue Type: Bug > Components: runner-spark > Reporter: Thomas Groh > Priority: Minor > > This is the cause of having a separate path and implementation for > single-output ParDos, even though both go through the same translator. > Partial stacktrace: > Tests run: 9, Failures: 0, Errors: 6, Skipped: 0, Time elapsed: 5.946 sec <<< > FAILURE! - in > org.apache.beam.runners.spark.translation.streaming.CreateStreamTest > [8233/41535] > testLateDataAccumulating(org.apache.beam.runners.spark.translation.streaming.CreateStreamTest) > Time elapsed: 3.593 sec <<< ERROR! > java.lang.RuntimeException: > java.io.NotSerializableException: DStream checkpointing has been enabled but > the DStreams with their functions are not serializable > org.apache.beam.runners.spark.translation.EvaluationContext > Serialization stack: > - object not serializable (class: > org.apache.beam.runners.spark.translation.EvaluationContext, value: > org.apache.beam.runners.spark.translation.EvaluationContext@a8c55d7) > - field (class: > org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$8$1, > name: val$context, type: class > org.apache.beam.runners.spark.translation.EvaluationContext) > - object (class > org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$8$1, > > org.apache.beam.runners.spark.translation.streaming.StreamingTransformTranslator$8$1@44f50940) > - field (class: > org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transformToPair$1, > name: transformFunc$3, type: interface > org.apache.spark.api.java.function.Function) > - object (class > org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$transformToPair$1, > <function1>) > - field (class: > org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21, > name: cleanedF$2, type: interface scala.Function1) > - object (class > org.apache.spark.streaming.dstream.DStream$$anonfun$transform$1$$anonfun$apply$21, > <function2>) > - field (class: > org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5, > name: cleanedF$3, type: interface scala.Function2) > - object (class > org.apache.spark.streaming.dstream.DStream$$anonfun$transform$2$$anonfun$5, > <function2>) > - writeObject data (class: > org.apache.spark.streaming.dstream.DStreamCheckpointData) > - object (class > org.apache.spark.streaming.dstream.DStreamCheckpointData, [ > 0 checkpoint files > ]) > .... > at > org.apache.beam.runners.spark.SparkPipelineResult.runtimeExceptionFrom(SparkPipelineResult.java:60) > at > org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:77) > at > org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:113) > at > org.apache.beam.runners.spark.TestSparkRunner.awaitWatermarksOrTimeout(TestSparkRunner.java:195) > at > org.apache.beam.runners.spark.TestSparkRunner.run(TestSparkRunner.java:127) > at > org.apache.beam.runners.spark.TestSparkRunner.run(TestSparkRunner.java:82) > at org.apache.beam.sdk.Pipeline.run(Pipeline.java:210) > ... -- This message was sent by Atlassian JIRA (v6.3.15#6346)