[ https://issues.apache.org/jira/browse/BEAM-2669?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16138063#comment-16138063 ]
ASF GitHub Bot commented on BEAM-2669: -------------------------------------- GitHub user kobisalant opened a pull request: https://github.com/apache/beam/pull/3748 BEAM-2669 Kryo serialization exception when DStreams containing non-Kryo-serializable data are cached remove kryo Serializer hard coded settings and add Serializable implementation to window/state/timer classes Follow this checklist to help us incorporate your contribution quickly and easily: - [ ] Make sure there is a [JIRA issue](https://issues.apache.org/jira/projects/BEAM/issues/) filed for the change (usually before you start working on it). Trivial changes like typos do not require a JIRA issue. Your pull request should address just this issue, without pulling in other changes. - [ ] Each commit in the pull request should have a meaningful subject line and body. - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue. - [ ] Write a pull request description that is detailed enough to understand what the pull request does, how, and why. - [ ] Run `mvn clean verify` to make sure basic checks pass. A more thorough check will be performed on your pull request automatically. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). --- You can merge this pull request into a Git repository by running: $ git pull https://github.com/kobisalant/incubator-beam BEAM-2669-Kryo-serialization-exception-when-dstream-cached Alternatively you can review and apply these changes as the patch at: https://github.com/apache/beam/pull/3748.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3748 ---- commit f3cbbdf9c43caab3b9c497a49b6aa69f564d1462 Author: ksalant <kobi.sal...@gmail.com> Date: 2017-08-23T08:23:11Z remove kryo Serializer hard coded settings and add Serializable implementation to window/state/timer classes ---- > Kryo serialization exception when DStreams containing non-Kryo-serializable > data are cached > ------------------------------------------------------------------------------------------- > > Key: BEAM-2669 > URL: https://issues.apache.org/jira/browse/BEAM-2669 > Project: Beam > Issue Type: Bug > Components: runner-spark > Affects Versions: 0.4.0, 0.5.0, 0.6.0, 2.0.0 > Reporter: Aviem Zur > Assignee: Kobi Salant > > Today, when we detect re-use of a dataset in a pipeline in Spark runner we > eagerly cache it to avoid calculating the same data multiple times. > ([EvaluationContext.java|https://github.com/apache/beam/blob/v2.0.0/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java#L148]) > When the dataset is bounded, which in Spark is represented by an {{RDD}}, we > call {{RDD#persist}} and use storage level provided by the user via > {{SparkPipelineOptions}}. > ([BoundedDataset.java|https://github.com/apache/beam/blob/v2.0.0/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/BoundedDataset.java#L103-L103]) > When the dataset is unbounded, which in Spark is represented by a {{DStream}} > we call {{DStream.cache()}} which defaults to persist the {{DStream}} using > storage level {{MEMORY_ONLY_SER}} > ([UnboundedDataset.java|https://github.com/apache/beam/blob/v2.0.0/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/UnboundedDataset.java#L61]) > > ([DStream.scala|https://github.com/apache/spark/blob/v1.6.3/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala#L169]) > Storage level {{MEMORY_ONLY_SER}} means Spark will serialize the data using > its configured serializer. Since we configure this to be Kryo in a hard coded > fashion, this means the data will be serialized using Kryo. > ([SparkContextFactory.java|https://github.com/apache/beam/blob/v2.0.0/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java#L99-L99]) > Due to this, if your {{DStream}} contains non-Kryo-serializable data you will > encounter Kryo serialization exceptions and your task will fail. > Possible actions we should consider: > # Remove the hard coded Spark serializer configuration, this should be taken > from the user's configuration of Spark, no real reason for us to interfere > with this. > # Use the user's configured storage level configuration from > {{SparkPipelineOptions}} when caching unbounded datasets ({{DStream}}s), same > as we do for bounded datasets. > # Make caching of re-used datasets configurable in {{SparkPipelineOptions}} > (enable/disable). Although overloading our configuration with more options is > always something not to be taken lightly. -- This message was sent by Atlassian JIRA (v6.4.14#64029)