Aviem Zur created BEAM-2669:
-------------------------------

             Summary: Kryo serialization exception when DStreams containing 
non-Kryo-serializable data are cached
                 Key: BEAM-2669
                 URL: https://issues.apache.org/jira/browse/BEAM-2669
             Project: Beam
          Issue Type: Bug
          Components: runner-spark
    Affects Versions: 2.0.0, 0.6.0, 0.5.0, 0.4.0
            Reporter: Aviem Zur
            Assignee: Amit Sela


Today, when we detect re-use of a dataset in Spark runner we eagerly cache it 
to avoid calculating the same data multiple times.
([EvaluationContext.java|https://github.com/apache/beam/blob/v2.0.0/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java#L148])

When the dataset is bounded, which in Spark is represented by an {{RDD}}, we 
call {{RDD#persist}} and use storage level provided by the user via 
{{SparkPipelineOptions}}. 
([BoundedDataset.java|https://github.com/apache/beam/blob/v2.0.0/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java#L103-L103])

When the dataset is unbounded, which in Spark is represented by a {{DStream}} 
we call {{DStream.cache()}} which defaults to persist the {{DStream}} using 
storage level {{MEMORY_ONLY_SER}} 
([UnboundedDataset.java|https://github.com/apache/beam/blob/v2.0.0/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.java#L61])
 
([DStream.scala|https://github.com/apache/spark/blob/v1.6.3/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala#L169])

Storage level {{MEMORY_ONLY_SER}} means Spark will serialize the data using its 
configured serializer. Since we configure this to be Kryo in a hard coded 
fashion, this means the data will be serialized using Kryo. 
([SparkContextFactory.java|https://github.com/apache/beam/blob/v2.0.0/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java#L99-L99])

Due to this, if your {{DStream}} contains non-Kryo-serializable data you will 
encounter Kryo serialization exceptions and your task will fail.

Possible actions we should consider:
# Remove the hard coded Spark serializer configuration, this should be taken 
from the user's configuration of Spark, no real reason for us to interfere with 
this.
# Use the user's configured storage level configuration from 
{{SparkPipelineOptions}} when caching unbounded datasets ({{DStream}}s), same 
as we do for bounded datasets.
# Make caching of re-used datasets configurable in {{SparkPipelineOptions}} 
(enable/disable). Although overloading our configuration with more options is 
always something not to be taken lightly.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to