GryoSerializer uses HadoopPools so that gryo pools are not constantly produced 
(object reused stylie). This have increased the performance of GryoSerializer 
based jobs to that of the 3.2.x line prior to bumping to Spark 2.0.


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/2321117c
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/2321117c
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/2321117c

Branch: refs/heads/master
Commit: 2321117c1fb9f5927569d9d61fa28250916b4807
Parents: f0c5a5f
Author: Marko A. Rodriguez <[email protected]>
Authored: Mon Sep 12 12:22:05 2016 -0600
Committer: Marko A. Rodriguez <[email protected]>
Committed: Tue Nov 29 04:54:21 2016 -0700

----------------------------------------------------------------------
 .../hadoop/structure/io/HadoopPools.java        |  5 +++
 .../spark/structure/io/gryo/GryoSerializer.java | 40 +++++---------------
 2 files changed, 14 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/2321117c/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPools.java
----------------------------------------------------------------------
diff --git 
a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPools.java
 
b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPools.java
index 5074ad5..392e97d 100644
--- 
a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPools.java
+++ 
b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPools.java
@@ -52,6 +52,11 @@ public final class HadoopPools {
         
HadoopPools.initialize(ConfUtil.makeApacheConfiguration(configuration));
     }
 
+    public synchronized static void initialize(final GryoPool gryoPool) {
+        GRYO_POOL = gryoPool;
+        INITIALIZED = true;
+    }
+
     public static GryoPool getGryoPool() {
         if (!INITIALIZED) {
             HadoopGraph.LOGGER.warn("The " + HadoopPools.class.getSimpleName() 
+ " has not been initialized, using the default pool");     // TODO: this is 
necessary because we can't get the pool intialized in the Merger code of the 
Hadoop process.

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/2321117c/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java
----------------------------------------------------------------------
diff --git 
a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java
 
b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java
index 6735fe5..00cb702 100644
--- 
a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java
+++ 
b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java
@@ -33,6 +33,7 @@ import org.apache.spark.serializer.SerializerInstance;
 import org.apache.spark.storage.BlockManagerId;
 import org.apache.spark.util.SerializableConfiguration;
 import org.apache.spark.util.collection.CompactBuffer;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.HadoopPools;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
 import 
org.apache.tinkerpop.gremlin.spark.process.computer.payload.MessagePayload;
@@ -49,9 +50,7 @@ import scala.collection.mutable.WrappedArray;
 import scala.runtime.BoxedUnit;
 
 import java.io.Serializable;
-import java.util.ArrayList;
 import java.util.Collections;
-import java.util.List;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -61,14 +60,9 @@ public final class GryoSerializer extends Serializer 
implements Serializable {
     //private final Option<String> userRegistrator;
     private final int bufferSize;
     private final int maxBufferSize;
-    private final int poolSize;
-    private final ArrayList<String> ioRegList = new ArrayList<>();
     private final boolean referenceTracking;
     private final boolean registrationRequired;
 
-
-    private transient GryoPool gryoPool;
-
     public GryoSerializer(final SparkConf sparkConfiguration) {
         final long bufferSizeKb = 
sparkConfiguration.getSizeAsKb("spark.kryoserializer.buffer", "64k");
         final long maxBufferSizeMb = 
sparkConfiguration.getSizeAsMb("spark.kryoserializer.buffer.max", "64m");
@@ -85,19 +79,10 @@ public final class GryoSerializer extends Serializer 
implements Serializable {
                 //this.userRegistrator = 
sparkConfiguration.getOption("spark.kryo.registrator");
             }
         }
-        poolSize = 
sparkConfiguration.getInt(GryoPool.CONFIG_IO_GRYO_POOL_SIZE, 
GryoPool.CONFIG_IO_GRYO_POOL_SIZE_DEFAULT);
-        List<Object> list = 
makeApacheConfiguration(sparkConfiguration).getList(GryoPool.CONFIG_IO_REGISTRY,
 Collections.emptyList());
-        list.forEach(c -> {
-                    ioRegList.add(c.toString());
-                }
-        );
-    }
-
-    private GryoPool createPool(){
-        List<Object> list = new ArrayList<>(ioRegList);
-        return GryoPool.build().
-                poolSize(poolSize).
-                ioRegistries(list).
+        // create a GryoPool and store it in static HadoopPools
+        HadoopPools.initialize(GryoPool.build().
+                
poolSize(sparkConfiguration.getInt(GryoPool.CONFIG_IO_GRYO_POOL_SIZE, 
GryoPool.CONFIG_IO_GRYO_POOL_SIZE_DEFAULT)).
+                
ioRegistries(makeApacheConfiguration(sparkConfiguration).getList(GryoPool.CONFIG_IO_REGISTRY,
 Collections.emptyList())).
                 initializeMapper(builder -> {
                     try {
                         builder.addCustom(Tuple2.class, new Tuple2Serializer())
@@ -122,13 +107,13 @@ public final class GryoSerializer extends Serializer 
implements Serializable {
                                 .addCustom(SerializableConfiguration.class, 
new JavaSerializer())
                                 .addCustom(VertexWritable.class, new 
VertexWritableSerializer())
                                 .addCustom(ObjectWritable.class, new 
ObjectWritableSerializer())
-                                .referenceTracking(referenceTracking)
-                                .registrationRequired(registrationRequired);
+                                .referenceTracking(this.referenceTracking)
+                                
.registrationRequired(this.registrationRequired);
                         // add these as we find ClassNotFoundExceptions
                     } catch (final ClassNotFoundException e) {
                         throw new IllegalStateException(e);
                     }
-                }).create();
+                }).create());
     }
 
     public Output newOutput() {
@@ -136,14 +121,7 @@ public final class GryoSerializer extends Serializer 
implements Serializable {
     }
 
     public GryoPool getGryoPool() {
-        if (gryoPool == null) {
-            synchronized (this) {
-                if (gryoPool == null) {
-                    gryoPool = createPool();
-                }
-            }
-        }
-        return this.gryoPool;
+        return HadoopPools.getGryoPool();
     }
 
     @Override

Reply via email to