Able to now test both shim and non-shim models in Spark. Also go configuration with ProgramTest working.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/e7003635 Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/e7003635 Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/e7003635 Branch: refs/heads/TINKERPOP-1278 Commit: e7003635e27c625b3f30492111f20f4fe4e24eb5 Parents: 0cd31bf Author: Marko A. Rodriguez <okramma...@gmail.com> Authored: Mon Jun 6 14:52:53 2016 -0600 Committer: Marko A. Rodriguez <okramma...@gmail.com> Committed: Mon Jun 6 14:52:53 2016 -0600 ---------------------------------------------------------------------- .../structure/io/gryo/GryoSerializers.java | 8 +- .../structure/io/gryo/GryoRegistrator.java | 90 ++++++++++++++++---- .../computer/SparkHadoopGraphProvider.java | 11 ++- 3 files changed, 88 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/e7003635/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoSerializers.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoSerializers.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoSerializers.java index 16fbe85..2042a4a 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoSerializers.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoSerializers.java @@ -47,7 +47,7 @@ public final class GryoSerializers { /** * Serializes any {@link Edge} implementation encountered to a {@link DetachedEdge}. */ - final static class EdgeSerializer implements SerializerShim<Edge> { + public final static class EdgeSerializer implements SerializerShim<Edge> { @Override public <O extends OutputShim> void write(KryoShim<?, O> kryo, O output, Edge edge) { kryo.writeClassAndObject(output, DetachedFactory.detach(edge, true)); @@ -63,7 +63,7 @@ public final class GryoSerializers { /** * Serializes any {@link Vertex} implementation encountered to an {@link DetachedVertex}. */ - final static class VertexSerializer implements SerializerShim<Vertex> { + public final static class VertexSerializer implements SerializerShim<Vertex> { @Override public <O extends OutputShim> void write(KryoShim<?, O> kryo, O output, Vertex vertex) { kryo.writeClassAndObject(output, DetachedFactory.detach(vertex, true)); @@ -78,7 +78,7 @@ public final class GryoSerializers { /** * Serializes any {@link Property} implementation encountered to an {@link DetachedProperty}. */ - final static class PropertySerializer implements SerializerShim<Property> { + public final static class PropertySerializer implements SerializerShim<Property> { @Override public <O extends OutputShim> void write(KryoShim<?, O> kryo, O output, Property property) { kryo.writeClassAndObject(output, property instanceof VertexProperty ? DetachedFactory.detach((VertexProperty) property, true) : DetachedFactory.detach(property)); @@ -93,7 +93,7 @@ public final class GryoSerializers { /** * Serializes any {@link VertexProperty} implementation encountered to an {@link DetachedVertexProperty}. */ - final static class VertexPropertySerializer implements SerializerShim<VertexProperty> { + public final static class VertexPropertySerializer implements SerializerShim<VertexProperty> { @Override public <O extends OutputShim> void write(KryoShim<?, O> kryo, O output, VertexProperty vertexProperty) { kryo.writeClassAndObject(output, DetachedFactory.detach(vertexProperty, true)); http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/e7003635/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoRegistrator.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoRegistrator.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoRegistrator.java index 1ae8c5c..9563408 100644 --- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoRegistrator.java +++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoRegistrator.java @@ -22,19 +22,44 @@ import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.Serializer; import com.esotericsoftware.kryo.serializers.JavaSerializer; import org.apache.spark.serializer.KryoRegistrator; +import org.apache.spark.util.SerializableConfiguration; +import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopConfiguration; +import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopEdge; +import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopProperty; +import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopVertex; +import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopVertexProperty; import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable; import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable; +import org.apache.tinkerpop.gremlin.process.computer.util.ComputerGraph; +import org.apache.tinkerpop.gremlin.process.traversal.Path; +import org.apache.tinkerpop.gremlin.process.traversal.step.util.ImmutablePath; +import org.apache.tinkerpop.gremlin.process.traversal.step.util.MutablePath; +import org.apache.tinkerpop.gremlin.process.traversal.traverser.B_LP_O_P_S_SE_SL_Traverser; +import org.apache.tinkerpop.gremlin.process.traversal.traverser.B_LP_O_S_SE_SL_Traverser; +import org.apache.tinkerpop.gremlin.process.traversal.traverser.B_O_S_SE_SL_Traverser; +import org.apache.tinkerpop.gremlin.process.traversal.traverser.B_O_Traverser; +import org.apache.tinkerpop.gremlin.process.traversal.traverser.LP_O_OB_P_S_SE_SL_Traverser; +import org.apache.tinkerpop.gremlin.process.traversal.traverser.LP_O_OB_S_SE_SL_Traverser; +import org.apache.tinkerpop.gremlin.process.traversal.traverser.O_OB_S_SE_SL_Traverser; +import org.apache.tinkerpop.gremlin.process.traversal.traverser.O_Traverser; import org.apache.tinkerpop.gremlin.spark.process.computer.payload.MessagePayload; import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewIncomingPayload; import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewOutgoingPayload; import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewPayload; import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.kryoshim.unshaded.UnshadedSerializerAdapter; +import org.apache.tinkerpop.gremlin.structure.Edge; +import org.apache.tinkerpop.gremlin.structure.Property; +import org.apache.tinkerpop.gremlin.structure.Vertex; +import org.apache.tinkerpop.gremlin.structure.VertexProperty; import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoMapper; +import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoSerializers; import org.apache.tinkerpop.gremlin.structure.io.gryo.TypeRegistration; import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.SerializerShim; import org.apache.tinkerpop.gremlin.structure.util.star.StarGraph; +import org.apache.tinkerpop.gremlin.structure.util.star.StarGraphSerializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import scala.collection.mutable.WrappedArray; import java.util.*; @@ -55,11 +80,11 @@ public class GryoRegistrator implements KryoRegistrator { * Register TinkerPop's classes with the supplied {@link Kryo} instance * while honoring optional overrides and optional class blacklist ("blackset"?). * - * @param kryo the Kryo serializer instance with which to register types + * @param kryo the Kryo serializer instance with which to register types * @param serializerOverrides serializer mappings that override this class's defaults - * @param blacklist classes which should not be registered at all, even if there is an override entry - * or if they would be registered by this class by default (does not affect Kryo's - * built-in registrations, e.g. String.class). + * @param blacklist classes which should not be registered at all, even if there is an override entry + * or if they would be registered by this class by default (does not affect Kryo's + * built-in registrations, e.g. String.class). */ public void registerClasses(Kryo kryo, Map<Class<?>, Serializer<?>> serializerOverrides, Set<Class<?>> blacklist) { // Apply TinkerPop type registrations copied from GyroSerializer's constructor @@ -111,11 +136,11 @@ public class GryoRegistrator implements KryoRegistrator { } else { // There's supposed to be a check in GryoMapper that prevents this from happening log.error("GryoMapper's default serialization registration for {} is a {}. " + - "This is probably a bug in TinkerPop (this is not a valid default registration). " + - "I am configuring Spark to use Kryo's default serializer for this class, " + - "but this may cause serialization failures at runtime.", - tr.getTargetClass(), - org.apache.tinkerpop.shaded.kryo.Serializer.class.getCanonicalName()); + "This is probably a bug in TinkerPop (this is not a valid default registration). " + + "I am configuring Spark to use Kryo's default serializer for this class, " + + "but this may cause serialization failures at runtime.", + tr.getTargetClass(), + org.apache.tinkerpop.shaded.kryo.Serializer.class.getCanonicalName()); kryo.register(tr.getTargetClass()); } } else if (null != serializerShim) { @@ -127,12 +152,12 @@ public class GryoRegistrator implements KryoRegistrator { } else if (null != functionOfShadedKryo) { // As with shaded serializers, there's supposed to be a check in GryoMapper that prevents this from happening log.error("GryoMapper's default serialization registration for {} is a Function<{},{}>. " + - "This is probably a bug in TinkerPop (this is not a valid default registration). " + - "I am configuring Spark to use Kryo's default serializer instead of this function, " + - "but this may cause serialization failures at runtime.", - tr.getTargetClass(), - org.apache.tinkerpop.shaded.kryo.Kryo.class.getCanonicalName(), - org.apache.tinkerpop.shaded.kryo.Serializer.class.getCanonicalName()); + "This is probably a bug in TinkerPop (this is not a valid default registration). " + + "I am configuring Spark to use Kryo's default serializer instead of this function, " + + "but this may cause serialization failures at runtime.", + tr.getTargetClass(), + org.apache.tinkerpop.shaded.kryo.Kryo.class.getCanonicalName(), + org.apache.tinkerpop.shaded.kryo.Serializer.class.getCanonicalName()); kryo.register(tr.getTargetClass()); } else { // Register all other classes with the default behavior (FieldSerializer) @@ -164,13 +189,46 @@ public class GryoRegistrator implements KryoRegistrator { // duplication, but it would be a bit cumbersome to do so without disturbing // the ordering of the existing entries in that constructor, since not all // of the entries are for TinkerPop (and the ordering is significant). + if (Boolean.valueOf(System.getProperty("is.testing", "false"))) { + try { + m.put(Class.forName("scala.reflect.ClassTag$$anon$1"), new JavaSerializer()); + m.put(Class.forName("scala.reflect.ManifestFactory$$anon$1"), new JavaSerializer()); + } catch (final ClassNotFoundException e) { + throw new IllegalStateException(e.getMessage(), e); + } + } + m.put(WrappedArray.ofRef.class, null); m.put(MessagePayload.class, null); m.put(ViewIncomingPayload.class, null); m.put(ViewOutgoingPayload.class, null); m.put(ViewPayload.class, null); m.put(VertexWritable.class, new UnshadedSerializerAdapter<>(new VertexWritableSerializer())); m.put(ObjectWritable.class, new UnshadedSerializerAdapter<>(new ObjectWritableSerializer<>())); - + // + m.put(HadoopConfiguration.class, null); + // + m.put(HadoopVertex.class, new UnshadedSerializerAdapter<>(new GryoSerializers.VertexSerializer())); + m.put(HadoopVertexProperty.class, new UnshadedSerializerAdapter<>(new GryoSerializers.VertexPropertySerializer())); + m.put(HadoopProperty.class, new UnshadedSerializerAdapter<>(new GryoSerializers.PropertySerializer())); + m.put(HadoopEdge.class, new UnshadedSerializerAdapter<>(new GryoSerializers.EdgeSerializer())); + // + m.put(ComputerGraph.ComputerVertex.class, new UnshadedSerializerAdapter<>(new GryoSerializers.VertexSerializer())); + m.put(ComputerGraph.ComputerVertexProperty.class, new UnshadedSerializerAdapter<>(new GryoSerializers.VertexPropertySerializer())); + m.put(ComputerGraph.ComputerProperty.class, new UnshadedSerializerAdapter<>(new GryoSerializers.PropertySerializer())); + m.put(ComputerGraph.ComputerEdge.class, new UnshadedSerializerAdapter<>(new GryoSerializers.EdgeSerializer())); + // + m.put(StarGraph.StarEdge.class, new UnshadedSerializerAdapter<>(new GryoSerializers.EdgeSerializer())); + m.put(StarGraph.StarVertex.class, new UnshadedSerializerAdapter<>(new GryoSerializers.VertexSerializer())); + m.put(StarGraph.StarProperty.class, new UnshadedSerializerAdapter<>(new GryoSerializers.PropertySerializer())); + m.put(StarGraph.StarVertexProperty.class, new UnshadedSerializerAdapter<>(new GryoSerializers.VertexPropertySerializer())); + // + m.put(MutablePath.class, new UnshadedSerializerAdapter<>(new GryoSerializers.PathSerializer())); + m.put(ImmutablePath.class, new UnshadedSerializerAdapter<>(new GryoSerializers.PathSerializer())); + try { + m.put(Class.forName(ImmutablePath.class.getCanonicalName() + "$TailPath"), new UnshadedSerializerAdapter<>(new GryoSerializers.PathSerializer())); + } catch (final ClassNotFoundException e) { + throw new IllegalStateException(e.getMessage(), e); + } return m; } http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/e7003635/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java index 85552ce..7737d1e 100644 --- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java +++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java @@ -26,6 +26,7 @@ import org.apache.tinkerpop.gremlin.hadoop.Constants; import org.apache.tinkerpop.gremlin.hadoop.HadoopGraphProvider; import org.apache.tinkerpop.gremlin.hadoop.groovy.plugin.HadoopGremlinPluginCheck; import org.apache.tinkerpop.gremlin.hadoop.structure.io.FileSystemStorageCheck; +import org.apache.tinkerpop.gremlin.hadoop.structure.io.HadoopPoolShimService; import org.apache.tinkerpop.gremlin.process.computer.Computer; import org.apache.tinkerpop.gremlin.process.computer.GraphComputer; import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource; @@ -40,10 +41,14 @@ import org.apache.tinkerpop.gremlin.spark.structure.io.SparkContextStorageCheck; import org.apache.tinkerpop.gremlin.spark.structure.io.ToyGraphInputRDD; import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoRegistrator; import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer; +import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.kryoshim.unshaded.UnshadedKryoShimService; import org.apache.tinkerpop.gremlin.structure.Graph; +import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimServiceLoader; import java.util.Map; +import static org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimServiceLoader.SHIM_CLASS_SYSTEM_PROPERTY; + /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ @@ -83,9 +88,13 @@ public final class SparkHadoopGraphProvider extends HadoopGraphProvider { config.put("spark.master", "local[4]"); - if (false) { + if (RANDOM.nextBoolean()) { + System.setProperty(SHIM_CLASS_SYSTEM_PROPERTY, HadoopPoolShimService.class.getCanonicalName()); + KryoShimServiceLoader.load(true); config.put("spark.serializer", GryoSerializer.class.getCanonicalName()); } else { + System.setProperty(SHIM_CLASS_SYSTEM_PROPERTY, UnshadedKryoShimService.class.getCanonicalName()); + KryoShimServiceLoader.load(true); config.put("spark.serializer", KryoSerializer.class.getCanonicalName()); config.put("spark.kryo.registrator", GryoRegistrator.class.getCanonicalName()); }