KryoShimServices now implement .close() so they can relinquish resources. 
KryoShimServiceLoader also implements close() and will close any static shim 
service it maintains. On a force reload, any existing KryoShimService is 
close()d. This was necessary for the test suite as different IORegitries were 
being loaded, dangling KRYO objects didn't have the loaded registries. Reverted 
GryoMapper to how it was and now the Spark-specific classes in GryoSerializer 
are simply an IoRegistry -- ta da. Going to do GraphSON IoRegistry testing in 
Giraph and Spark and then I think we are done with tihs branch. cc/ @dalaro.


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/b6954f98
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/b6954f98
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/b6954f98

Branch: refs/heads/master
Commit: b6954f98e71bfe120148b016b7f797e7b15b7e5a
Parents: 07503d6
Author: Marko A. Rodriguez <[email protected]>
Authored: Thu Oct 27 05:27:48 2016 -0600
Committer: Marko A. Rodriguez <[email protected]>
Committed: Tue Nov 29 04:57:14 2016 -0700

----------------------------------------------------------------------
 .../gremlin/structure/io/gryo/GryoMapper.java   | 43 +++++-----
 .../io/gryo/kryoshim/KryoShimService.java       |  8 +-
 .../io/gryo/kryoshim/KryoShimServiceLoader.java | 13 ++-
 .../structure/io/HadoopPoolShimService.java     |  7 +-
 .../hadoop/structure/io/HadoopPools.java        |  4 +-
 .../structure/io/AbstractIoRegistryCheck.java   |  2 +
 .../spark/structure/io/gryo/GryoSerializer.java | 85 ++++++++++++--------
 .../io/gryo/IoRegistryAwareKryoSerializer.java  |  2 +-
 .../unshaded/UnshadedKryoShimService.java       |  6 ++
 .../SparkHadoopGraphGryoSerializerProvider.java |  2 +
 .../computer/SparkHadoopGraphProvider.java      |  7 +-
 .../structure/io/SparkIoRegistryCheck.java      |  5 ++
 12 files changed, 120 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/b6954f98/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoMapper.java
----------------------------------------------------------------------
diff --git 
a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoMapper.java
 
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoMapper.java
index d0fc32d..7b3a6b4 100644
--- 
a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoMapper.java
+++ 
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoMapper.java
@@ -440,6 +440,8 @@ public final class GryoMapper implements Mapper<Kryo> {
             add(GryoTypeReg.of(ProfileStep.ProfileBiOperator.class, 119));
         }};
 
+        private final List<IoRegistry> registries = new ArrayList<>();
+
         /**
          * Starts numbering classes for Gryo serialization at 65536 to leave 
room for future usage by TinkerPop.
          */
@@ -472,23 +474,7 @@ public final class GryoMapper implements Mapper<Kryo> {
         @Override
         public Builder addRegistry(final IoRegistry registry) {
             if (null == registry) throw new IllegalArgumentException("The 
registry cannot be null");
-            final List<Pair<Class, Object>> serializers = 
registry.find(GryoIo.class);
-            serializers.forEach(p -> {
-                if (null == p.getValue1())
-                    addCustom(p.getValue0());
-                else if (p.getValue1() instanceof SerializerShim)
-                    addCustom(p.getValue0(), new 
ShadedSerializerAdapter((SerializerShim) p.getValue1()));
-                else if (p.getValue1() instanceof Serializer)
-                    addCustom(p.getValue0(), (Serializer) p.getValue1());
-                else if (p.getValue1() instanceof Function)
-                    addCustom(p.getValue0(), (Function<Kryo, Serializer>) 
p.getValue1());
-                else
-                    throw new IllegalStateException(String.format(
-                            "Unexpected value provided by %s for serializable 
class %s - expected a parameter in [null, %s (or shim) implementation or 
Function<%s, %s>], but received %s",
-                            registry.getClass().getSimpleName(), 
p.getValue0().getClass().getCanonicalName(),
-                            Serializer.class.getName(), 
Kryo.class.getSimpleName(),
-                            Serializer.class.getSimpleName(), p.getValue1()));
-            });
+            this.registries.add(registry);
             return this;
         }
 
@@ -576,6 +562,27 @@ public final class GryoMapper implements Mapper<Kryo> {
          * Creates a {@code GryoMapper}.
          */
         public GryoMapper create() {
+            // consult the registry if provided and inject registry entries as 
custom classes.
+            registries.forEach(registry -> {
+                final List<Pair<Class, Object>> serializers = 
registry.find(GryoIo.class);
+                serializers.forEach(p -> {
+                    if (null == p.getValue1())
+                        addCustom(p.getValue0());
+                    else if (p.getValue1() instanceof SerializerShim)
+                        addCustom(p.getValue0(), new 
ShadedSerializerAdapter((SerializerShim) p.getValue1()));
+                    else if (p.getValue1() instanceof Serializer)
+                        addCustom(p.getValue0(), (Serializer) p.getValue1());
+                    else if (p.getValue1() instanceof Function)
+                        addCustom(p.getValue0(), (Function<Kryo, Serializer>) 
p.getValue1());
+                    else
+                        throw new IllegalStateException(String.format(
+                                "Unexpected value provided by %s for 
serializable class %s - expected a parameter in [null, %s implementation or 
Function<%s, %s>], but received %s",
+                                registry.getClass().getSimpleName(), 
p.getValue0().getClass().getCanonicalName(),
+                                Serializer.class.getName(), 
Kryo.class.getSimpleName(),
+                                Serializer.class.getSimpleName(), 
p.getValue1()));
+                });
+            });
+
             return new GryoMapper(this);
         }
 
@@ -708,4 +715,4 @@ public final class GryoMapper implements Mapper<Kryo> {
                     .toString();
         }
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/b6954f98/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimService.java
----------------------------------------------------------------------
diff --git 
a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimService.java
 
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimService.java
index 4d3ece5..4422e1b 100644
--- 
a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimService.java
+++ 
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimService.java
@@ -57,7 +57,7 @@ public interface KryoShimService {
     /**
      * Serializes an object to an output stream.  This may flush the output 
stream.
      *
-     * @param o the object to serialize
+     * @param o    the object to serialize
      * @param sink the stream into which the serialized object is written
      */
     public void writeClassAndObject(final Object o, final OutputStream sink);
@@ -92,4 +92,10 @@ public interface KryoShimService {
      * @param conf the configuration to apply to this service's internal 
serializer
      */
     public void applyConfiguration(final Configuration conf);
+
+    /**
+     * Release all resources associated with the shim service.
+     * This is called on a forced reload or when the {@link 
KryoShimServiceLoader} is closed.
+     */
+    public void close();
 }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/b6954f98/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java
----------------------------------------------------------------------
diff --git 
a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java
 
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java
index f41b007..70be7ad 100644
--- 
a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java
+++ 
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java
@@ -51,12 +51,19 @@ public class KryoShimServiceLoader {
     public static void applyConfiguration(final Configuration configuration) {
         if (null == KryoShimServiceLoader.configuration ||
                 null == KryoShimServiceLoader.cachedShimService ||
-                
!ConfigurationUtils.toString(KryoShimServiceLoader.configuration).equals(ConfigurationUtils.toString(configuration)))
 {
+                !KryoShimServiceLoader.configuration.getKeys().hasNext()) {
             KryoShimServiceLoader.configuration = configuration;
             load(true);
         }
     }
 
+    public static void close() {
+        if (null != cachedShimService)
+            cachedShimService.close();
+        cachedShimService = null;
+        configuration = null;
+    }
+
     /**
      * Return a reference to the shim service.  This method may return a 
cached shim service
      * unless {@code forceReload} is true.  Calls to this method need not be 
externally
@@ -72,6 +79,10 @@ public class KryoShimServiceLoader {
         if (null != cachedShimService && !forceReload)
             return cachedShimService;
 
+        // if a service is already loaded, close it
+        if (null != cachedShimService)
+            cachedShimService.close();
+
         // if the configuration is null, try and load the configuration from 
System.properties
         if (null == configuration)
             configuration = 
SystemUtil.getSystemPropertiesConfiguration("tinkerpop", true);

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/b6954f98/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPoolShimService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPoolShimService.java
 
b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPoolShimService.java
index a52eac4..db79d97 100644
--- 
a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPoolShimService.java
+++ 
b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPoolShimService.java
@@ -20,11 +20,9 @@ package org.apache.tinkerpop.gremlin.hadoop.structure.io;
 
 import org.apache.commons.configuration.Configuration;
 import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimService;
-import org.apache.tinkerpop.shaded.kryo.Kryo;
 import org.apache.tinkerpop.shaded.kryo.io.Input;
 import org.apache.tinkerpop.shaded.kryo.io.Output;
 
-import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 
@@ -50,6 +48,11 @@ public class HadoopPoolShimService implements 
KryoShimService {
     }
 
     @Override
+    public void close() {
+        HadoopPools.close();
+    }
+
+    @Override
     public int getPriority() {
         return 0;
     }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/b6954f98/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPools.java
----------------------------------------------------------------------
diff --git 
a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPools.java
 
b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPools.java
index 8f7b97c..e652509 100644
--- 
a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPools.java
+++ 
b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPools.java
@@ -44,8 +44,8 @@ public final class HadoopPools {
             GRYO_POOL = GryoPool.build().
                     
poolSize(configuration.getInt(GryoPool.CONFIG_IO_GRYO_POOL_SIZE, 256)).
                     
ioRegistries(configuration.getList(GryoPool.CONFIG_IO_REGISTRY, 
Collections.emptyList())).
-                    //initializeMapper(m -> m.registrationRequired(false)).
-                            create();
+                    initializeMapper(m -> m.registrationRequired(false)).
+                    create();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/b6954f98/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/AbstractIoRegistryCheck.java
----------------------------------------------------------------------
diff --git 
a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/AbstractIoRegistryCheck.java
 
b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/AbstractIoRegistryCheck.java
index 9002d57..db947c2 100644
--- 
a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/AbstractIoRegistryCheck.java
+++ 
b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/AbstractIoRegistryCheck.java
@@ -65,6 +65,8 @@ public abstract class AbstractIoRegistryCheck extends 
AbstractGremlinTest {
         
graph.configuration().setProperty(Constants.GREMLIN_HADOOP_GRAPH_WRITER, 
GryoOutputFormat.class.getCanonicalName());
         
graph.configuration().setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, 
input.getAbsolutePath());
         graph.configuration().setProperty(GryoPool.CONFIG_IO_REGISTRY, 
ToyIoRegistry.class.getCanonicalName());
+        final GryoOutputFormat inputFormat = new GryoOutputFormat();
+        inputFormat.getRecordWriter()
         final GryoRecordWriter writer = new GryoRecordWriter(new 
DataOutputStream(new FileOutputStream(input)), 
ConfUtil.makeHadoopConfiguration(graph.configuration()));
         validateIoRegistryGraph(graph, graphComputerClass, writer, 
GryoInputFormat.class);
         assertTrue(input.delete());

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/b6954f98/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java
----------------------------------------------------------------------
diff --git 
a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java
 
b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java
index 1b9fa3b..3bdf81f 100644
--- 
a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java
+++ 
b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java
@@ -41,6 +41,8 @@ import 
org.apache.tinkerpop.gremlin.spark.process.computer.payload.MessagePayloa
 import 
org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewIncomingPayload;
 import 
org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewOutgoingPayload;
 import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewPayload;
+import org.apache.tinkerpop.gremlin.structure.io.AbstractIoRegistry;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoIo;
 import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoPool;
 import org.apache.tinkerpop.shaded.kryo.io.Output;
 import org.apache.tinkerpop.shaded.kryo.serializers.ExternalizableSerializer;
@@ -51,7 +53,9 @@ import scala.collection.mutable.WrappedArray;
 import scala.runtime.BoxedUnit;
 
 import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.List;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -67,8 +71,8 @@ public final class GryoSerializer extends Serializer 
implements Serializable {
     public GryoSerializer(final SparkConf sparkConfiguration) {
         final long bufferSizeKb = 
sparkConfiguration.getSizeAsKb("spark.kryoserializer.buffer", "64k");
         final long maxBufferSizeMb = 
sparkConfiguration.getSizeAsMb("spark.kryoserializer.buffer.max", "64m");
-        referenceTracking = 
sparkConfiguration.getBoolean("spark.kryo.referenceTracking", true);
-        registrationRequired = 
sparkConfiguration.getBoolean(Constants.SPARK_KRYO_REGISTRATION_REQUIRED, 
false);
+        this.referenceTracking = 
sparkConfiguration.getBoolean("spark.kryo.referenceTracking", true);
+        this.registrationRequired = 
sparkConfiguration.getBoolean(Constants.SPARK_KRYO_REGISTRATION_REQUIRED, 
false);
         if (bufferSizeKb >= ByteUnit.GiB.toKiB(2L)) {
             throw new IllegalArgumentException("spark.kryoserializer.buffer 
must be less than 2048 mb, got: " + bufferSizeKb + " mb.");
         } else {
@@ -81,40 +85,16 @@ public final class GryoSerializer extends Serializer 
implements Serializable {
             }
         }
         // create a GryoPool and store it in static HadoopPools
+        final List<Object> ioRegistries = new ArrayList<>();
+        
ioRegistries.addAll(makeApacheConfiguration(sparkConfiguration).getList(GryoPool.CONFIG_IO_REGISTRY,
 Collections.emptyList()));
+        ioRegistries.add(SparkIoRegistry.class.getCanonicalName().replace("." 
+ SparkIoRegistry.class.getSimpleName(), "$" + 
SparkIoRegistry.class.getSimpleName()));
         HadoopPools.initialize(GryoPool.build().
                 
poolSize(sparkConfiguration.getInt(GryoPool.CONFIG_IO_GRYO_POOL_SIZE, 
GryoPool.CONFIG_IO_GRYO_POOL_SIZE_DEFAULT)).
-                
ioRegistries(makeApacheConfiguration(sparkConfiguration).getList(GryoPool.CONFIG_IO_REGISTRY,
 Collections.emptyList())).
-                initializeMapper(builder -> {
-                    try {
-                        builder.addCustom(Tuple2.class, new Tuple2Serializer())
-                                .addCustom(Tuple2[].class)
-                                .addCustom(Tuple3.class, new 
Tuple3Serializer())
-                                .addCustom(Tuple3[].class)
-                                .addCustom(CompactBuffer.class, new 
CompactBufferSerializer())
-                                .addCustom(CompactBuffer[].class)
-                                .addCustom(CompressedMapStatus.class)
-                                .addCustom(BlockManagerId.class)
-                                .addCustom(HighlyCompressedMapStatus.class, 
new ExternalizableSerializer())   // externalizable implemented so its okay
-                                .addCustom(TorrentBroadcast.class)
-                                .addCustom(PythonBroadcast.class)
-                                .addCustom(BoxedUnit.class)
-                                
.addCustom(Class.forName("scala.reflect.ClassTag$$anon$1"), new 
JavaSerializer())
-                                
.addCustom(Class.forName("scala.reflect.ManifestFactory$$anon$1"), new 
JavaSerializer())
-                                .addCustom(WrappedArray.ofRef.class, new 
WrappedArraySerializer())
-                                .addCustom(MessagePayload.class)
-                                .addCustom(ViewIncomingPayload.class)
-                                .addCustom(ViewOutgoingPayload.class)
-                                .addCustom(ViewPayload.class)
-                                .addCustom(SerializableConfiguration.class, 
new JavaSerializer())
-                                .addCustom(VertexWritable.class, new 
VertexWritableSerializer())
-                                .addCustom(ObjectWritable.class, new 
ObjectWritableSerializer())
-                                .referenceTracking(this.referenceTracking)
-                                
.registrationRequired(this.registrationRequired);
-                        // add these as we find ClassNotFoundExceptions
-                    } catch (final ClassNotFoundException e) {
-                        throw new IllegalStateException(e);
-                    }
-                }).create());
+                ioRegistries(ioRegistries).
+                initializeMapper(builder ->
+                        builder.referenceTracking(this.referenceTracking).
+                                
registrationRequired(this.registrationRequired)).
+                create());
     }
 
     public Output newOutput() {
@@ -138,4 +118,41 @@ public final class GryoSerializer extends Serializer 
implements Serializable {
         }
         return apacheConfiguration;
     }
+
+    public static class SparkIoRegistry extends AbstractIoRegistry {
+        private static final SparkIoRegistry INSTANCE = new SparkIoRegistry();
+
+        private SparkIoRegistry() {
+            try {
+                super.register(GryoIo.class, Tuple2.class, new 
Tuple2Serializer());
+                super.register(GryoIo.class, Tuple2[].class, null);
+                super.register(GryoIo.class, Tuple3.class, new 
Tuple3Serializer());
+                super.register(GryoIo.class, Tuple3[].class, null);
+                super.register(GryoIo.class, CompactBuffer.class, new 
CompactBufferSerializer());
+                super.register(GryoIo.class, CompactBuffer[].class, null);
+                super.register(GryoIo.class, CompressedMapStatus.class, null);
+                super.register(GryoIo.class, BlockManagerId.class, null);
+                super.register(GryoIo.class, HighlyCompressedMapStatus.class, 
new ExternalizableSerializer());  // externalizable implemented so its okay
+                super.register(GryoIo.class, TorrentBroadcast.class, null);
+                super.register(GryoIo.class, PythonBroadcast.class, null);
+                super.register(GryoIo.class, BoxedUnit.class, null);
+                super.register(GryoIo.class, 
Class.forName("scala.reflect.ClassTag$$anon$1"), new JavaSerializer());
+                super.register(GryoIo.class, 
Class.forName("scala.reflect.ManifestFactory$$anon$1"), new JavaSerializer());
+                super.register(GryoIo.class, WrappedArray.ofRef.class, new 
WrappedArraySerializer());
+                super.register(GryoIo.class, MessagePayload.class, null);
+                super.register(GryoIo.class, ViewIncomingPayload.class, null);
+                super.register(GryoIo.class, ViewOutgoingPayload.class, null);
+                super.register(GryoIo.class, ViewPayload.class, null);
+                super.register(GryoIo.class, SerializableConfiguration.class, 
new JavaSerializer());
+                super.register(GryoIo.class, VertexWritable.class, new 
VertexWritableSerializer());
+                super.register(GryoIo.class, ObjectWritable.class, new 
ObjectWritableSerializer());
+            } catch (final ClassNotFoundException e) {
+                throw new IllegalStateException(e);
+            }
+        }
+
+        public static SparkIoRegistry getInstance() {
+            return INSTANCE;
+        }
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/b6954f98/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/IoRegistryAwareKryoSerializer.java
----------------------------------------------------------------------
diff --git 
a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/IoRegistryAwareKryoSerializer.java
 
b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/IoRegistryAwareKryoSerializer.java
index ba6d001..1385a5b 100644
--- 
a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/IoRegistryAwareKryoSerializer.java
+++ 
b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/IoRegistryAwareKryoSerializer.java
@@ -70,7 +70,7 @@ public final class IoRegistryAwareKryoSerializer extends 
KryoSerializer {
             else if (null != type.getShadedSerializer() && 
type.getShadedSerializer() instanceof ShadedSerializerAdapter)
                 kryo.register(type.getTargetClass(), new 
UnshadedSerializerAdapter(((ShadedSerializerAdapter) 
type.getShadedSerializer()).getSerializerShim()), type.getId());
             else
-                kryo.register(type.getTargetClass(), type.getId());
+                kryo.register(type.getTargetClass(), 
kryo.getDefaultSerializer(type.getTargetClass()), type.getId());
         }
         return kryo;
     }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/b6954f98/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedKryoShimService.java
----------------------------------------------------------------------
diff --git 
a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedKryoShimService.java
 
b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedKryoShimService.java
index 4932acb..caf5268 100644
--- 
a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedKryoShimService.java
+++ 
b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedKryoShimService.java
@@ -90,6 +90,12 @@ public class UnshadedKryoShimService implements 
KryoShimService {
         initialize(configuration);
     }
 
+    @Override
+    public void close() {
+        INITIALIZED = false;
+        KRYOS.clear();
+    }
+
     private LinkedBlockingQueue<Kryo> initialize(final Configuration 
configuration) {
         // DCL is safe in this case due to volatility
         if (!INITIALIZED) {

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/b6954f98/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphGryoSerializerProvider.java
----------------------------------------------------------------------
diff --git 
a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphGryoSerializerProvider.java
 
b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphGryoSerializerProvider.java
index 0e7fe0d..6ebe626 100644
--- 
a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphGryoSerializerProvider.java
+++ 
b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphGryoSerializerProvider.java
@@ -23,6 +23,7 @@ import org.apache.tinkerpop.gremlin.LoadGraphWith;
 import org.apache.tinkerpop.gremlin.hadoop.Constants;
 import org.apache.tinkerpop.gremlin.spark.structure.Spark;
 import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer;
+import 
org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimServiceLoader;
 
 import java.util.Map;
 
@@ -34,6 +35,7 @@ public final class SparkHadoopGraphGryoSerializerProvider 
extends SparkHadoopGra
     public Map<String, Object> getBaseConfiguration(final String graphName, 
final Class<?> test, final String testMethodName, final LoadGraphWith.GraphData 
loadGraphWith) {
         if 
(!SparkHadoopGraphGryoSerializerProvider.class.getCanonicalName().equals(System.getProperty(PREVIOUS_SPARK_PROVIDER,
 null))) {
             Spark.close();
+            KryoShimServiceLoader.close();
             System.setProperty(PREVIOUS_SPARK_PROVIDER, 
SparkHadoopGraphGryoSerializerProvider.class.getCanonicalName());
         }
         final Map<String, Object> config = 
super.getBaseConfiguration(graphName, test, testMethodName, loadGraphWith);

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/b6954f98/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java
----------------------------------------------------------------------
diff --git 
a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java
 
b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java
index 8bc6519..dcec3f8 100644
--- 
a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java
+++ 
b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java
@@ -27,7 +27,6 @@ import org.apache.tinkerpop.gremlin.hadoop.Constants;
 import org.apache.tinkerpop.gremlin.hadoop.HadoopGraphProvider;
 import 
org.apache.tinkerpop.gremlin.hadoop.groovy.plugin.HadoopGremlinPluginCheck;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.FileSystemStorageCheck;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.ToyIoRegistry;
 import org.apache.tinkerpop.gremlin.process.computer.Computer;
 import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
 import 
org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
@@ -41,11 +40,10 @@ import 
org.apache.tinkerpop.gremlin.process.traversal.step.map.ProgramTest;
 import org.apache.tinkerpop.gremlin.spark.structure.Spark;
 import org.apache.tinkerpop.gremlin.spark.structure.io.PersistedOutputRDD;
 import 
org.apache.tinkerpop.gremlin.spark.structure.io.SparkContextStorageCheck;
-import org.apache.tinkerpop.gremlin.spark.structure.io.SparkIoRegistryCheck;
 import org.apache.tinkerpop.gremlin.spark.structure.io.ToyGraphInputRDD;
 import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoRegistrator;
 import org.apache.tinkerpop.gremlin.structure.Graph;
-import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoPool;
+import 
org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimServiceLoader;
 
 import java.util.Map;
 
@@ -61,6 +59,7 @@ public class SparkHadoopGraphProvider extends 
HadoopGraphProvider {
     public Map<String, Object> getBaseConfiguration(final String graphName, 
final Class<?> test, final String testMethodName, final LoadGraphWith.GraphData 
loadGraphWith) {
         if (this.getClass().equals(SparkHadoopGraphProvider.class) && 
!SparkHadoopGraphProvider.class.getCanonicalName().equals(System.getProperty(PREVIOUS_SPARK_PROVIDER,
 null))) {
             Spark.close();
+            KryoShimServiceLoader.close();
             System.setProperty(PREVIOUS_SPARK_PROVIDER, 
SparkHadoopGraphProvider.class.getCanonicalName());
         }
 
@@ -98,8 +97,6 @@ public class SparkHadoopGraphProvider extends 
HadoopGraphProvider {
         config.put(Constants.SPARK_SERIALIZER, 
KryoSerializer.class.getCanonicalName());
         config.put(Constants.SPARK_KRYO_REGISTRATOR, 
GryoRegistrator.class.getCanonicalName());
         config.put(Constants.SPARK_KRYO_REGISTRATION_REQUIRED, true);
-        // TODO: why do I need this here?!
-        config.put(GryoPool.CONFIG_IO_REGISTRY, 
ToyIoRegistry.class.getCanonicalName());
         return config;
     }
 

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/b6954f98/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkIoRegistryCheck.java
----------------------------------------------------------------------
diff --git 
a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkIoRegistryCheck.java
 
b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkIoRegistryCheck.java
index f748d86..54ed4ed 100644
--- 
a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkIoRegistryCheck.java
+++ 
b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkIoRegistryCheck.java
@@ -24,6 +24,9 @@ import 
org.apache.tinkerpop.gremlin.hadoop.structure.io.AbstractIoRegistryCheck;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.HadoopPools;
 import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer;
 import org.apache.tinkerpop.gremlin.spark.structure.Spark;
+import 
org.apache.tinkerpop.gremlin.spark.structure.io.gryo.kryoshim.unshaded.UnshadedKryoShimService;
+import 
org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimServiceLoader;
+import org.apache.tinkerpop.gremlin.util.SystemUtil;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -39,6 +42,7 @@ public class SparkIoRegistryCheck extends 
AbstractIoRegistryCheck {
         SparkContextStorage.open("local[4]");
         Spark.close();
         HadoopPools.close();
+        KryoShimServiceLoader.close();
     }
 
     @After
@@ -47,6 +51,7 @@ public class SparkIoRegistryCheck extends 
AbstractIoRegistryCheck {
         Spark.create("local[4]");
         Spark.close();
         HadoopPools.close();
+        KryoShimServiceLoader.close();
     }
 
     @Test

Reply via email to