GryoSerializer uses HadoopPools so that gryo pools are not constantly produced (object reused stylie). This have increased the performance of GryoSerializer based jobs to that of the 3.2.x line prior to bumping to Spark 2.0.
Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/2321117c Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/2321117c Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/2321117c Branch: refs/heads/master Commit: 2321117c1fb9f5927569d9d61fa28250916b4807 Parents: f0c5a5f Author: Marko A. Rodriguez <[email protected]> Authored: Mon Sep 12 12:22:05 2016 -0600 Committer: Marko A. Rodriguez <[email protected]> Committed: Tue Nov 29 04:54:21 2016 -0700 ---------------------------------------------------------------------- .../hadoop/structure/io/HadoopPools.java | 5 +++ .../spark/structure/io/gryo/GryoSerializer.java | 40 +++++--------------- 2 files changed, 14 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/2321117c/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPools.java ---------------------------------------------------------------------- diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPools.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPools.java index 5074ad5..392e97d 100644 --- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPools.java +++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPools.java @@ -52,6 +52,11 @@ public final class HadoopPools { HadoopPools.initialize(ConfUtil.makeApacheConfiguration(configuration)); } + public synchronized static void initialize(final GryoPool gryoPool) { + GRYO_POOL = gryoPool; + INITIALIZED = true; + } + public static GryoPool getGryoPool() { if (!INITIALIZED) { HadoopGraph.LOGGER.warn("The " + HadoopPools.class.getSimpleName() + " has not been initialized, using the default pool"); // TODO: this is necessary because we can't get the pool intialized in the Merger code of the Hadoop process. http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/2321117c/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java index 6735fe5..00cb702 100644 --- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java +++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java @@ -33,6 +33,7 @@ import org.apache.spark.serializer.SerializerInstance; import org.apache.spark.storage.BlockManagerId; import org.apache.spark.util.SerializableConfiguration; import org.apache.spark.util.collection.CompactBuffer; +import org.apache.tinkerpop.gremlin.hadoop.structure.io.HadoopPools; import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable; import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable; import org.apache.tinkerpop.gremlin.spark.process.computer.payload.MessagePayload; @@ -49,9 +50,7 @@ import scala.collection.mutable.WrappedArray; import scala.runtime.BoxedUnit; import java.io.Serializable; -import java.util.ArrayList; import java.util.Collections; -import java.util.List; /** * @author Marko A. Rodriguez (http://markorodriguez.com) @@ -61,14 +60,9 @@ public final class GryoSerializer extends Serializer implements Serializable { //private final Option<String> userRegistrator; private final int bufferSize; private final int maxBufferSize; - private final int poolSize; - private final ArrayList<String> ioRegList = new ArrayList<>(); private final boolean referenceTracking; private final boolean registrationRequired; - - private transient GryoPool gryoPool; - public GryoSerializer(final SparkConf sparkConfiguration) { final long bufferSizeKb = sparkConfiguration.getSizeAsKb("spark.kryoserializer.buffer", "64k"); final long maxBufferSizeMb = sparkConfiguration.getSizeAsMb("spark.kryoserializer.buffer.max", "64m"); @@ -85,19 +79,10 @@ public final class GryoSerializer extends Serializer implements Serializable { //this.userRegistrator = sparkConfiguration.getOption("spark.kryo.registrator"); } } - poolSize = sparkConfiguration.getInt(GryoPool.CONFIG_IO_GRYO_POOL_SIZE, GryoPool.CONFIG_IO_GRYO_POOL_SIZE_DEFAULT); - List<Object> list = makeApacheConfiguration(sparkConfiguration).getList(GryoPool.CONFIG_IO_REGISTRY, Collections.emptyList()); - list.forEach(c -> { - ioRegList.add(c.toString()); - } - ); - } - - private GryoPool createPool(){ - List<Object> list = new ArrayList<>(ioRegList); - return GryoPool.build(). - poolSize(poolSize). - ioRegistries(list). + // create a GryoPool and store it in static HadoopPools + HadoopPools.initialize(GryoPool.build(). + poolSize(sparkConfiguration.getInt(GryoPool.CONFIG_IO_GRYO_POOL_SIZE, GryoPool.CONFIG_IO_GRYO_POOL_SIZE_DEFAULT)). + ioRegistries(makeApacheConfiguration(sparkConfiguration).getList(GryoPool.CONFIG_IO_REGISTRY, Collections.emptyList())). initializeMapper(builder -> { try { builder.addCustom(Tuple2.class, new Tuple2Serializer()) @@ -122,13 +107,13 @@ public final class GryoSerializer extends Serializer implements Serializable { .addCustom(SerializableConfiguration.class, new JavaSerializer()) .addCustom(VertexWritable.class, new VertexWritableSerializer()) .addCustom(ObjectWritable.class, new ObjectWritableSerializer()) - .referenceTracking(referenceTracking) - .registrationRequired(registrationRequired); + .referenceTracking(this.referenceTracking) + .registrationRequired(this.registrationRequired); // add these as we find ClassNotFoundExceptions } catch (final ClassNotFoundException e) { throw new IllegalStateException(e); } - }).create(); + }).create()); } public Output newOutput() { @@ -136,14 +121,7 @@ public final class GryoSerializer extends Serializer implements Serializable { } public GryoPool getGryoPool() { - if (gryoPool == null) { - synchronized (this) { - if (gryoPool == null) { - gryoPool = createPool(); - } - } - } - return this.gryoPool; + return HadoopPools.getGryoPool(); } @Override
