[BEAM-809] Create a KryoRegistrator for the SparkRunner. Use Class#getName() instead of canonicalName().
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/13b83858 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/13b83858 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/13b83858 Branch: refs/heads/python-sdk Commit: 13b83858746356068a6d618e04da6839e837d28c Parents: 53fe3ee Author: Sela <ans...@paypal.com> Authored: Mon Oct 24 22:35:39 2016 +0300 Committer: Sela <ans...@paypal.com> Committed: Wed Oct 26 18:53:28 2016 +0300 ---------------------------------------------------------------------- runners/spark/pom.xml | 23 ++++++++++ .../coders/BeamSparkRunnerRegistrator.java | 46 ++++++++++++++++++++ .../spark/translation/SparkContextFactory.java | 5 ++- 3 files changed, 73 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/13b83858/runners/spark/pom.xml ---------------------------------------------------------------------- diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml index ccec3c6..458205a 100644 --- a/runners/spark/pom.xml +++ b/runners/spark/pom.xml @@ -147,6 +147,29 @@ <scope>provided</scope> </dependency> <dependency> + <groupId>com.esotericsoftware.kryo</groupId> + <artifactId>kryo</artifactId> + <version>2.21</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>de.javakaffee</groupId> + <artifactId>kryo-serializers</artifactId> + <version>0.39</version> + <exclusions> + <!-- Use Spark's Kryo --> + <exclusion> + <groupId>com.esotericsoftware</groupId> + <artifactId>kryo</artifactId> + </exclusion> + <!-- We only really need the serializer implementations --> + <exclusion> + <groupId>com.google.protobuf</groupId> + <artifactId>protobuf-java</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> <groupId>com.google.code.findbugs</groupId> <artifactId>jsr305</artifactId> <version>1.3.9</version> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/13b83858/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java new file mode 100644 index 0000000..0e62781 --- /dev/null +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.spark.coders; + +import com.esotericsoftware.kryo.Kryo; +import de.javakaffee.kryoserializers.UnmodifiableCollectionsSerializer; +import de.javakaffee.kryoserializers.guava.ImmutableListSerializer; +import de.javakaffee.kryoserializers.guava.ImmutableMapSerializer; +import de.javakaffee.kryoserializers.guava.ImmutableMultimapSerializer; +import de.javakaffee.kryoserializers.guava.ImmutableSetSerializer; +import de.javakaffee.kryoserializers.guava.ReverseListSerializer; +import org.apache.spark.serializer.KryoRegistrator; + + +/** + * Custom {@link com.esotericsoftware.kryo.Serializer}s for Beam's Spark runner needs. + */ +public class BeamSparkRunnerRegistrator implements KryoRegistrator { + + @Override + public void registerClasses(Kryo kryo) { + UnmodifiableCollectionsSerializer.registerSerializers(kryo); + // Guava + ImmutableListSerializer.registerSerializers(kryo); + ImmutableSetSerializer.registerSerializers(kryo); + ImmutableMapSerializer.registerSerializers(kryo); + ImmutableMultimapSerializer.registerSerializers(kryo); + ReverseListSerializer.registerSerializers(kryo); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/13b83858/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java index 4877f6e..ee2104a 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java @@ -19,6 +19,7 @@ package org.apache.beam.runners.spark.translation; import org.apache.beam.runners.spark.SparkPipelineOptions; +import org.apache.beam.runners.spark.coders.BeamSparkRunnerRegistrator; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.serializer.KryoSerializer; @@ -85,7 +86,9 @@ public final class SparkContextFactory { conf.setMaster(options.getSparkMaster()); } conf.setAppName(options.getAppName()); - conf.set("spark.serializer", KryoSerializer.class.getCanonicalName()); + // register immutable collections serializers because the SDK uses them. + conf.set("spark.kryo.registrator", BeamSparkRunnerRegistrator.class.getName()); + conf.set("spark.serializer", KryoSerializer.class.getName()); return new JavaSparkContext(conf); } }