lots of good stuff here -- finally have testing of IORegistry in SparkGraphComputer. Have ToyPoint and TestIoRegistry. Realized a bunch of stupid .flush() calls in the GryoSerializer serializers (Spark)... perhaps that is why things are slower than KryoSerializer. Really cleaned up IoRegistryAwareKryoSerializer. Added getShim() methods to the Shaded/UnshadedSerializerAdaptors. I'm a stud. cc/ @dalaro
Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/0b799806 Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/0b799806 Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/0b799806 Branch: refs/heads/master Commit: 0b799806120ce49122a76be055d6d53e11896cfa Parents: a0fa7c6 Author: Marko A. Rodriguez <[email protected]> Authored: Wed Oct 26 14:21:42 2016 -0600 Committer: Marko A. Rodriguez <[email protected]> Committed: Tue Nov 29 04:57:14 2016 -0700 ---------------------------------------------------------------------- .../io/gryo/kryoshim/KryoShimServiceLoader.java | 2 +- .../shaded/ShadedSerializerAdapter.java | 6 +- .../gremlin/hadoop/HadoopGraphProvider.java | 1 - .../io/gryo/CompactBufferSerializer.groovy | 2 - .../io/gryo/IoRegistryAwareKryoSerializer.java | 39 +++---- .../io/gryo/ObjectWritableSerializer.java | 1 - .../structure/io/gryo/Tuple2Serializer.java | 2 - .../structure/io/gryo/Tuple3Serializer.java | 3 - .../io/gryo/VertexWritableSerializer.java | 1 - .../io/gryo/WrappedArraySerializer.java | 1 - .../unshaded/UnshadedSerializerAdapter.java | 18 ++-- .../gremlin/spark/AbstractSparkTest.java | 7 +- .../structure/io/gryo/GryoIoRegistryTest.java | 101 +++++++++++++++++++ .../spark/structure/io/gryo/TestIoRegistry.java | 40 ++++++++ .../spark/structure/io/gryo/ToyPoint.java | 75 ++++++++++++++ 15 files changed, 255 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/0b799806/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java index 5f50f9e..9287b10 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java @@ -114,7 +114,7 @@ public class KryoShimServiceLoader { throw new IllegalStateException("Unable to load KryoShimService"); // once the shim service is defined, configure it - log.info("Configuring KryoShimService {} with following configuration:\n####################\n{}\n####################", + log.info("Configuring KryoShimService {} with following configuration:\n#######START########\n{}\n########END#########", cachedShimService.getClass().getCanonicalName(), ConfigurationUtils.toString(configuration)); cachedShimService.applyConfiguration(configuration); http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/0b799806/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/shaded/ShadedSerializerAdapter.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/shaded/ShadedSerializerAdapter.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/shaded/ShadedSerializerAdapter.java index 28a44bd..fca19c7 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/shaded/ShadedSerializerAdapter.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/shaded/ShadedSerializerAdapter.java @@ -26,7 +26,7 @@ import org.apache.tinkerpop.shaded.kryo.io.Output; public class ShadedSerializerAdapter<T> extends Serializer<T> { - SerializerShim<T> serializer; + private final SerializerShim<T> serializer; public ShadedSerializerAdapter(final SerializerShim<T> serializer) { this.serializer = serializer; @@ -51,4 +51,8 @@ public class ShadedSerializerAdapter<T> extends Serializer<T> { final ShadedInputAdapter shadedInputAdapter = new ShadedInputAdapter(input); return serializer.read(shadedKryoAdapter, shadedInputAdapter, aClass); } + + public SerializerShim<T> getSerializerShim() { + return this.serializer; + } } http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/0b799806/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/HadoopGraphProvider.java ---------------------------------------------------------------------- diff --git a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/HadoopGraphProvider.java b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/HadoopGraphProvider.java index 9c6a352..c95ede5 100644 --- a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/HadoopGraphProvider.java +++ b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/HadoopGraphProvider.java @@ -111,7 +111,6 @@ public class HadoopGraphProvider extends AbstractGraphProvider { @Override public Map<String, Object> getBaseConfiguration(final String graphName, final Class<?> test, final String testMethodName, final LoadGraphWith.GraphData loadGraphWith) { - System.clearProperty(KRYO_SHIM_SERVICE); this.graphSONInput = RANDOM.nextBoolean(); return new HashMap<String, Object>() {{ put(Graph.GRAPH, HadoopGraph.class.getName()); http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/0b799806/spark-gremlin/src/main/groovy/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/CompactBufferSerializer.groovy ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/main/groovy/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/CompactBufferSerializer.groovy b/spark-gremlin/src/main/groovy/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/CompactBufferSerializer.groovy index be491c4..693ced6 100644 --- a/spark-gremlin/src/main/groovy/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/CompactBufferSerializer.groovy +++ b/spark-gremlin/src/main/groovy/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/CompactBufferSerializer.groovy @@ -44,10 +44,8 @@ public final class CompactBufferSerializer<T> extends Serializer<CompactBuffer<T kryo.writeClassAndObject(output, compactBuffer.evidence$1); kryo.writeClassAndObject(output, compactBuffer.element0); kryo.writeClassAndObject(output, compactBuffer.element1); - output.flush(); output.writeVarInt(compactBuffer.org$apache$spark$util$collection$CompactBuffer$$curSize, true); kryo.writeClassAndObject(output, compactBuffer.otherElements); - output.flush(); } @Override http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/0b799806/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/IoRegistryAwareKryoSerializer.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/IoRegistryAwareKryoSerializer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/IoRegistryAwareKryoSerializer.java index 6d9b536..ba6d001 100644 --- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/IoRegistryAwareKryoSerializer.java +++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/IoRegistryAwareKryoSerializer.java @@ -31,41 +31,46 @@ import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.kryoshim.unshaded.Un import org.apache.tinkerpop.gremlin.structure.io.IoRegistry; import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoPool; import org.apache.tinkerpop.gremlin.structure.io.gryo.TypeRegistration; +import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.shaded.ShadedSerializerAdapter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; import java.util.Arrays; +import java.util.List; /** * A {@link KryoSerializer} that attempts to honor {@link GryoPool#CONFIG_IO_REGISTRY}. */ -public class IoRegistryAwareKryoSerializer extends KryoSerializer { - - private final SparkConf configuration; +public final class IoRegistryAwareKryoSerializer extends KryoSerializer { private static final Logger log = LoggerFactory.getLogger(IoRegistryAwareKryoSerializer.class); + private final List<TypeRegistration<?>> typeRegistrations = new ArrayList<>(); + public IoRegistryAwareKryoSerializer(final SparkConf configuration) { super(configuration); - // store conf so that we can access its registry (if one is present) in newKryo() - this.configuration = configuration; + if (!configuration.contains(GryoPool.CONFIG_IO_REGISTRY)) + log.info("SparkConf does not contain a {} property. Skipping {} processing.", GryoPool.CONFIG_IO_REGISTRY, IoRegistry.class.getCanonicalName()); + else { + final GryoPool pool = GryoPool.build().poolSize(1).ioRegistries(Arrays.asList(configuration.get(GryoPool.CONFIG_IO_REGISTRY).split(","))).create(); + for (final TypeRegistration<?> type : pool.getMapper().getTypeRegistrations()) { + log.info("Registering {} with serializer type: {}", type.getTargetClass().getCanonicalName(), type); + this.typeRegistrations.add(type); + } + } } @Override public Kryo newKryo() { final Kryo kryo = super.newKryo(); - return applyIoRegistryIfPresent(kryo); - } - - private Kryo applyIoRegistryIfPresent(final Kryo kryo) { - if (!this.configuration.contains(GryoPool.CONFIG_IO_REGISTRY)) { - log.info("SparkConf does not contain setting {}, skipping {} handling", GryoPool.CONFIG_IO_REGISTRY, IoRegistry.class.getCanonicalName()); - return kryo; - } - final GryoPool pool = GryoPool.build().poolSize(1).ioRegistries(Arrays.asList(this.configuration.get(GryoPool.CONFIG_IO_REGISTRY).split(","))).create(); - for (final TypeRegistration<?> type : pool.getMapper().getTypeRegistrations()) { - log.info("Registering {} with serializer {} and id {}", type.getTargetClass().getCanonicalName(), type.getSerializerShim(), type.getId()); - kryo.register(type.getTargetClass(), new UnshadedSerializerAdapter<>(type.getSerializerShim()), type.getId()); + for (final TypeRegistration<?> type : this.typeRegistrations) { + if (null != type.getSerializerShim()) + kryo.register(type.getTargetClass(), new UnshadedSerializerAdapter(type.getSerializerShim()), type.getId()); + else if (null != type.getShadedSerializer() && type.getShadedSerializer() instanceof ShadedSerializerAdapter) + kryo.register(type.getTargetClass(), new UnshadedSerializerAdapter(((ShadedSerializerAdapter) type.getShadedSerializer()).getSerializerShim()), type.getId()); + else + kryo.register(type.getTargetClass(), type.getId()); } return kryo; } http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/0b799806/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/ObjectWritableSerializer.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/ObjectWritableSerializer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/ObjectWritableSerializer.java index 01be50d..2ec9615 100644 --- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/ObjectWritableSerializer.java +++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/ObjectWritableSerializer.java @@ -37,7 +37,6 @@ public final class ObjectWritableSerializer<T> implements SerializerShim<ObjectW @Override public <O extends OutputShim> void write(final KryoShim<?, O> kryo, final O output, final ObjectWritable<T> starGraph) { kryo.writeClassAndObject(output, starGraph.get()); - output.flush(); } @Override http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/0b799806/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/Tuple2Serializer.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/Tuple2Serializer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/Tuple2Serializer.java index 05d0b9e..c610286 100644 --- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/Tuple2Serializer.java +++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/Tuple2Serializer.java @@ -34,9 +34,7 @@ public final class Tuple2Serializer<A, B> extends Serializer<Tuple2<A, B>> { @Override public void write(final Kryo kryo, final Output output, final Tuple2<A, B> tuple2) { kryo.writeClassAndObject(output, tuple2._1()); - output.flush(); kryo.writeClassAndObject(output, tuple2._2()); - output.flush(); } @Override http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/0b799806/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/Tuple3Serializer.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/Tuple3Serializer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/Tuple3Serializer.java index f188794..46a7d02 100644 --- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/Tuple3Serializer.java +++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/Tuple3Serializer.java @@ -33,11 +33,8 @@ public final class Tuple3Serializer<A, B, C> extends Serializer<Tuple3<A, B, C>> @Override public void write(final Kryo kryo, final Output output, final Tuple3<A, B, C> tuple3) { kryo.writeClassAndObject(output, tuple3._1()); - output.flush(); kryo.writeClassAndObject(output, tuple3._2()); - output.flush(); kryo.writeClassAndObject(output, tuple3._3()); - output.flush(); } @Override http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/0b799806/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/VertexWritableSerializer.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/VertexWritableSerializer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/VertexWritableSerializer.java index c89fb05..93c86d8 100644 --- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/VertexWritableSerializer.java +++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/VertexWritableSerializer.java @@ -38,7 +38,6 @@ public final class VertexWritableSerializer implements SerializerShim<VertexWrit @Override public <O extends OutputShim> void write(final KryoShim<?, O> kryo, final O output, final VertexWritable vertexWritable) { kryo.writeObject(output, vertexWritable.get().graph()); - output.flush(); } @Override http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/0b799806/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/WrappedArraySerializer.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/WrappedArraySerializer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/WrappedArraySerializer.java index 8de1955..803a19c 100644 --- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/WrappedArraySerializer.java +++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/WrappedArraySerializer.java @@ -36,7 +36,6 @@ public final class WrappedArraySerializer<T> extends Serializer<WrappedArray<T>> output.writeVarInt(iterable.size(), true); JavaConversions.asJavaCollection(iterable).forEach(t -> { kryo.writeClassAndObject(output, t); - output.flush(); }); } http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/0b799806/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedSerializerAdapter.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedSerializerAdapter.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedSerializerAdapter.java index a5f8b05..452c47a 100644 --- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedSerializerAdapter.java +++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedSerializerAdapter.java @@ -16,12 +16,6 @@ * specific language governing permissions and limitations * under the License. */ - -/** - * Copyright DataStax, Inc. - * - * Please see the included license file for details. - */ package org.apache.tinkerpop.gremlin.spark.structure.io.gryo.kryoshim.unshaded; import com.esotericsoftware.kryo.Kryo; @@ -30,10 +24,9 @@ import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.SerializerShim; -public class UnshadedSerializerAdapter<T> extends Serializer<T> -{ +public class UnshadedSerializerAdapter<T> extends Serializer<T> { - SerializerShim<T> serializer; + private final SerializerShim<T> serializer; public UnshadedSerializerAdapter(final SerializerShim<T> serializer) { this.serializer = serializer; @@ -48,10 +41,13 @@ public class UnshadedSerializerAdapter<T> extends Serializer<T> } @Override - public T read(final Kryo kryo, final Input input, final Class<T> aClass) - { + public T read(final Kryo kryo, final Input input, final Class<T> aClass) { UnshadedKryoAdapter shadedKryoAdapter = new UnshadedKryoAdapter(kryo); UnshadedInputAdapter shadedInputAdapter = new UnshadedInputAdapter(input); return serializer.read(shadedKryoAdapter, shadedInputAdapter, aClass); } + + public SerializerShim<T> getSerializerShim() { + return this.serializer; + } } http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/0b799806/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/AbstractSparkTest.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/AbstractSparkTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/AbstractSparkTest.java index 6d2231f..ab2cf2f 100644 --- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/AbstractSparkTest.java +++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/AbstractSparkTest.java @@ -24,6 +24,7 @@ import org.apache.commons.configuration.Configuration; import org.apache.spark.SparkConf; import org.apache.spark.SparkContext; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.launcher.SparkLauncher; import org.apache.tinkerpop.gremlin.hadoop.Constants; import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph; import org.apache.tinkerpop.gremlin.spark.structure.Spark; @@ -45,7 +46,7 @@ public abstract class AbstractSparkTest { public void setupTest() { SparkConf sparkConfiguration = new SparkConf(); sparkConfiguration.setAppName(this.getClass().getCanonicalName() + "-setupTest"); - sparkConfiguration.set("spark.master", "local[4]"); + sparkConfiguration.set(SparkLauncher.SPARK_MASTER, "local[4]"); JavaSparkContext sparkContext = new JavaSparkContext(SparkContext.getOrCreate(sparkConfiguration)); sparkContext.close(); Spark.create(sparkContext.sc()); @@ -56,9 +57,9 @@ public abstract class AbstractSparkTest { protected Configuration getBaseConfiguration() { final BaseConfiguration configuration = new BaseConfiguration(); configuration.setDelimiterParsingDisabled(true); - configuration.setProperty("spark.master", "local[4]"); + configuration.setProperty(SparkLauncher.SPARK_MASTER, "local[4]"); configuration.setProperty(Constants.SPARK_SERIALIZER, GryoSerializer.class.getCanonicalName()); - configuration.setProperty("spark.kryo.registrationRequired", true); + configuration.setProperty(Constants.SPARK_KRYO_REGISTRATION_REQUIRED, true); configuration.setProperty(Graph.GRAPH, HadoopGraph.class.getName()); configuration.setProperty(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false); return configuration; http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/0b799806/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoIoRegistryTest.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoIoRegistryTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoIoRegistryTest.java new file mode 100644 index 0000000..0260d02 --- /dev/null +++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoIoRegistryTest.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.spark.structure.io.gryo; + +import org.apache.commons.configuration.Configuration; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; +import org.apache.spark.serializer.KryoSerializer; +import org.apache.tinkerpop.gremlin.TestHelper; +import org.apache.tinkerpop.gremlin.hadoop.Constants; +import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph; +import org.apache.tinkerpop.gremlin.hadoop.structure.io.FileSystemStorage; +import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable; +import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat; +import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoOutputFormat; +import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoRecordWriter; +import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil; +import org.apache.tinkerpop.gremlin.spark.AbstractSparkTest; +import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer; +import org.apache.tinkerpop.gremlin.structure.T; +import org.apache.tinkerpop.gremlin.structure.Vertex; +import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoPool; +import org.apache.tinkerpop.gremlin.structure.util.star.StarGraph; +import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils; +import org.junit.Test; + +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public class GryoIoRegistryTest extends AbstractSparkTest { + + @Test + public void shouldSupportIoRegistry() throws Exception { + final File input = TestHelper.generateTempFile(this.getClass(), "input", ".kryo"); + final Configuration configuration = super.getBaseConfiguration(); + configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, input.getAbsolutePath()); + configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_READER, GryoInputFormat.class.getCanonicalName()); + configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_WRITER, GryoOutputFormat.class.getCanonicalName()); + configuration.setProperty(GryoPool.CONFIG_IO_REGISTRY, TestIoRegistry.class.getCanonicalName()); + //configuration.setProperty(Constants.SPARK_SERIALIZER, GryoSerializer.class.getCanonicalName()); + configuration.setProperty(Constants.SPARK_SERIALIZER, KryoSerializer.class.getCanonicalName()); + configuration.setProperty(Constants.SPARK_KRYO_REGISTRATOR, GryoRegistrator.class.getCanonicalName()); + + HadoopGraph graph = HadoopGraph.open(configuration); + + final GryoRecordWriter writer = new GryoRecordWriter(new DataOutputStream(new FileOutputStream(input)), ConfUtil.makeHadoopConfiguration(configuration)); + for (int i = 0; i < 10; i++) { + final StarGraph starGraph = StarGraph.open(); + starGraph.addVertex(T.label, "place", T.id, i, "point", new ToyPoint(i, i * 10), "message", "I'm " + i); + writer.write(NullWritable.get(), new VertexWritable(starGraph.getStarVertex())); + } + writer.close(new TaskAttemptContextImpl(ConfUtil.makeHadoopConfiguration(configuration), new TaskAttemptID())); + // OLAP TESTING // + final List<ToyPoint> points = graph.traversal().withComputer(SparkGraphComputer.class).V().<ToyPoint>values("point").toList(); + assertEquals(10, points.size()); + for (int i = 0; i < 10; i++) { + assertTrue(points.contains(new ToyPoint(i, i * 10))); + } + points.clear(); + // OLTP TESTING // + graph.traversal().V().<ToyPoint>values("point").fill(points); + assertEquals(10, points.size()); + for (int i = 0; i < 10; i++) { + assertTrue(points.contains(new ToyPoint(i, i * 10))); + } + points.clear(); + // HDFS TESTING // + final List<Vertex> list = IteratorUtils.asList(FileSystemStorage.open(ConfUtil.makeHadoopConfiguration(configuration)).head(input.getAbsolutePath(), GryoInputFormat.class)); + list.forEach(v -> points.add(v.value("point"))); + assertEquals(10, points.size()); + for (int i = 0; i < 10; i++) { + assertTrue(points.contains(new ToyPoint(i, i * 10))); + } + } +} http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/0b799806/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/TestIoRegistry.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/TestIoRegistry.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/TestIoRegistry.java new file mode 100644 index 0000000..9a78aab --- /dev/null +++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/TestIoRegistry.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.spark.structure.io.gryo; + +import org.apache.tinkerpop.gremlin.structure.io.AbstractIoRegistry; +import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoIo; +import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.shaded.ShadedSerializerAdapter; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public final class TestIoRegistry extends AbstractIoRegistry { + + private static final TestIoRegistry INSTANCE = new TestIoRegistry(); + + private TestIoRegistry() { + super.register(GryoIo.class, ToyPoint.class, new ShadedSerializerAdapter<>(new ToyPoint.ToyPointSerializer())); + } + + public static TestIoRegistry getInstance() { + return INSTANCE; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/0b799806/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/ToyPoint.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/ToyPoint.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/ToyPoint.java new file mode 100644 index 0000000..e46e9c3 --- /dev/null +++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/ToyPoint.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.tinkerpop.gremlin.spark.structure.io.gryo; + +import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.InputShim; +import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShim; +import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.OutputShim; +import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.SerializerShim; + +import java.io.Serializable; + +/** + * @author Marko A. Rodriguez (http://markorodriguez.com) + */ +public final class ToyPoint implements Serializable { + + private final int x; + private final int y; + + public ToyPoint(final int x, final int y) { + this.x = x; + this.y = y; + } + + public int getX() { + return this.x; + } + + public int getY() { + return this.y; + } + + public int hashCode() { + return this.x + this.y; + } + + public boolean equals(final Object other) { + return other instanceof ToyPoint && ((ToyPoint) other).x == this.x && ((ToyPoint) other).y == this.y; + } + + @Override + public String toString() { + return "[" + this.x + "," + this.y + "]"; + } + + public static class ToyPointSerializer implements SerializerShim<ToyPoint> { + @Override + public <O extends OutputShim> void write(final KryoShim<?, O> kryo, final O output, final ToyPoint toyPoint) { + output.writeInt(toyPoint.x); + output.writeInt(toyPoint.y); + } + + @Override + public <I extends InputShim> ToyPoint read(final KryoShim<I, ?> kryo, final I input, final Class<ToyPoint> toyPointClass) { + return new ToyPoint(input.readInt(), input.readInt()); + } + } +}
