VertexProgramHelper now uses Gryo to serialize objects if the standard Java Serializer fails. Moving forward (3.3.x), this should all be accomplished by KryoShimServiceLoader.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/797364cb Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/797364cb Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/797364cb Branch: refs/heads/tp31 Commit: 797364cb4b5c4d0bd18d59a3a7c6cdb5603e136c Parents: e700363 Author: Marko A. Rodriguez <okramma...@gmail.com> Authored: Mon Jun 6 16:37:38 2016 -0600 Committer: Marko A. Rodriguez <okramma...@gmail.com> Committed: Mon Jun 6 16:37:38 2016 -0600 ---------------------------------------------------------------------- .../computer/util/VertexProgramHelper.java | 33 +++++++++++++++----- .../gremlin/structure/io/gryo/GryoMapper.java | 17 +++++----- .../gremlin/hadoop/HadoopGraphProvider.java | 4 +++ .../structure/io/gryo/GryoRegistrator.java | 24 +++----------- 4 files changed, 45 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/797364cb/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/VertexProgramHelper.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/VertexProgramHelper.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/VertexProgramHelper.java index bc67866..2b3a0b2 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/VertexProgramHelper.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/VertexProgramHelper.java @@ -25,8 +25,13 @@ import org.apache.tinkerpop.gremlin.process.traversal.Step; import org.apache.tinkerpop.gremlin.process.traversal.Traversal; import org.apache.tinkerpop.gremlin.process.traversal.step.map.EdgeVertexStep; import org.apache.tinkerpop.gremlin.process.traversal.step.map.VertexStep; +import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoPool; +import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimServiceLoader; import org.apache.tinkerpop.gremlin.util.Serializer; +import org.apache.tinkerpop.shaded.kryo.io.Input; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.Arrays; import java.util.HashSet; @@ -37,6 +42,8 @@ import java.util.Set; */ public final class VertexProgramHelper { + private static final GryoPool GRYO_POOL = GryoPool.build().create(); + private VertexProgramHelper() { } @@ -67,21 +74,33 @@ public final class VertexProgramHelper { final String byteString = Arrays.toString(Serializer.serializeObject(object)); configuration.setProperty(key, byteString.substring(1, byteString.length() - 1)); } catch (final IOException e) { - throw new IllegalArgumentException(e.getMessage(), e); + try { + final ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + GRYO_POOL.doWithWriter(kryo -> kryo.writeObject(outputStream, object)); + String byteString = Arrays.toString(outputStream.toByteArray()); + configuration.setProperty(key, byteString.substring(1, byteString.length() - 1)); + } catch (final Exception e1) { + throw new IllegalArgumentException(e1.getMessage(), e1); + } } } public static <T> T deserialize(final Configuration configuration, final String key) { + final String[] stringBytes = configuration.getString(key).split(","); + byte[] bytes = new byte[stringBytes.length]; + for (int i = 0; i < stringBytes.length; i++) { + bytes[i] = Byte.valueOf(stringBytes[i].trim()); + } try { - final String[] stringBytes = configuration.getString(key).split(","); - byte[] bytes = new byte[stringBytes.length]; - for (int i = 0; i < stringBytes.length; i++) { - bytes[i] = Byte.valueOf(stringBytes[i].trim()); - } return (T) Serializer.deserializeObject(bytes); } catch (final IOException | ClassNotFoundException e) { - throw new IllegalArgumentException(e.getMessage(), e); + try { + return (T) GRYO_POOL.readWithKryo(kryo -> kryo.readClassAndObject(new Input(new ByteArrayInputStream(bytes)))); + } catch (final Exception e1) { + throw new IllegalArgumentException(e1.getMessage(), e1); + } } + } public static <S, E> Traversal.Admin<S, E> reverse(final Traversal.Admin<S, E> traversal) { http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/797364cb/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoMapper.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoMapper.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoMapper.java index 41ca44d..7bf9b7d 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoMapper.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoMapper.java @@ -48,6 +48,7 @@ import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSe import org.apache.tinkerpop.gremlin.process.traversal.util.DefaultTraversalMetrics; import org.apache.tinkerpop.gremlin.process.traversal.util.ImmutableMetrics; import org.apache.tinkerpop.gremlin.process.traversal.util.MutableMetrics; +import org.apache.tinkerpop.gremlin.process.traversal.util.PureTraversal; import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalExplanation; import org.apache.tinkerpop.gremlin.structure.Direction; import org.apache.tinkerpop.gremlin.structure.Edge; @@ -330,6 +331,8 @@ public final class GryoMapper implements Mapper<Kryo> { add(GryoTypeReg.of(AtomicLong.class, 79)); add(GryoTypeReg.of(Pair.class, 88, new PairSerializer())); add(GryoTypeReg.of(TraversalExplanation.class, 106, new JavaSerializer())); + add(GryoTypeReg.of(GraphFilter.class, 120, new JavaSerializer())); // ***LAST ID*** + //add(GryoTypeReg.of(PureTraversal.class, 121, new JavaSerializer())); add(GryoTypeReg.of(Duration.class, 93, new JavaTimeSerializers.DurationSerializer())); add(GryoTypeReg.of(Instant.class, 94, new JavaTimeSerializers.InstantSerializer())); @@ -355,7 +358,7 @@ public final class GryoMapper implements Mapper<Kryo> { add(GryoTypeReg.of(GroupStepV3d0.GroupBiOperatorV3d0.class, 113)); add(GryoTypeReg.of(RangeGlobalStep.RangeBiOperator.class, 114)); add(GryoTypeReg.of(OrderGlobalStep.OrderBiOperator.class, 118, new JavaSerializer())); // because they contain traversals - add(GryoTypeReg.of(ProfileStep.ProfileBiOperator.class, 119)); // ***LAST ID*** + add(GryoTypeReg.of(ProfileStep.ProfileBiOperator.class, 119)); }}; private final List<IoRegistry> registries = new ArrayList<>(); @@ -374,12 +377,12 @@ public final class GryoMapper implements Mapper<Kryo> { // For justification of these default registration rules, see TinkerPopKryoRegistrator for (TypeRegistration<?> tr : typeRegistrations) { if (tr.hasSerializer() /* no serializer is acceptable */ && - null == tr.getSerializerShim() /* a shim serializer is acceptable */ && - !(tr.getShadedSerializer() instanceof JavaSerializer) /* shaded JavaSerializer is acceptable */) { + null == tr.getSerializerShim() /* a shim serializer is acceptable */ && + !(tr.getShadedSerializer() instanceof JavaSerializer) /* shaded JavaSerializer is acceptable */) { // everything else is invalid String msg = String.format("The default GryoMapper type registration %s is invalid. " + - "It must supply either an implementation of %s or %s, but supplies neither. " + - "This is probably a bug in GryoMapper's default serialization class registrations.", tr, + "It must supply either an implementation of %s or %s, but supplies neither. " + + "This is probably a bug in GryoMapper's default serialization class registrations.", tr, SerializerShim.class.getCanonicalName(), JavaSerializer.class.getCanonicalName()); throw new IllegalStateException(msg); } @@ -553,8 +556,8 @@ public final class GryoMapper implements Mapper<Kryo> { if (1 < serializerCount) { String msg = String.format( "GryoTypeReg accepts at most one kind of serializer, but multiple " + - "serializers were supplied for class %s (id %s). " + - "Shaded serializer: %s. Shim serializer: %s. Shaded serializer function: %s.", + "serializers were supplied for class %s (id %s). " + + "Shaded serializer: %s. Shim serializer: %s. Shaded serializer function: %s.", this.clazz.getCanonicalName(), id, this.shadedSerializer, this.serializerShim, this.functionOfShadedKryo); throw new IllegalArgumentException(msg); http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/797364cb/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/HadoopGraphProvider.java ---------------------------------------------------------------------- diff --git a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/HadoopGraphProvider.java b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/HadoopGraphProvider.java index da00463..57157db 100644 --- a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/HadoopGraphProvider.java +++ b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/HadoopGraphProvider.java @@ -28,6 +28,7 @@ import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph; import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopProperty; import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopVertex; import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopVertexProperty; +import org.apache.tinkerpop.gremlin.hadoop.structure.io.HadoopPoolShimService; import org.apache.tinkerpop.gremlin.hadoop.structure.io.graphson.GraphSONInputFormat; import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat; import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoOutputFormat; @@ -45,6 +46,8 @@ import java.util.Map; import java.util.Random; import java.util.Set; +import static org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimServiceLoader.SHIM_CLASS_SYSTEM_PROPERTY; + /** * @author Marko A. Rodriguez (http://markorodriguez.com) * @author Stephen Mallette (http://stephen.genoprime.com) @@ -109,6 +112,7 @@ public class HadoopGraphProvider extends AbstractGraphProvider { @Override public Map<String, Object> getBaseConfiguration(final String graphName, final Class<?> test, final String testMethodName, final LoadGraphWith.GraphData loadGraphWith) { + System.setProperty(SHIM_CLASS_SYSTEM_PROPERTY, HadoopPoolShimService.class.getCanonicalName()); this.graphSONInput = RANDOM.nextBoolean(); return new HashMap<String, Object>() {{ put(Graph.GRAPH, HadoopGraph.class.getName()); http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/797364cb/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoRegistrator.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoRegistrator.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoRegistrator.java index 9563408..68112d7 100644 --- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoRegistrator.java +++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoRegistrator.java @@ -22,8 +22,6 @@ import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.Serializer; import com.esotericsoftware.kryo.serializers.JavaSerializer; import org.apache.spark.serializer.KryoRegistrator; -import org.apache.spark.util.SerializableConfiguration; -import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopConfiguration; import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopEdge; import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopProperty; import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopVertex; @@ -31,37 +29,27 @@ import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopVertexProperty; import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable; import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable; import org.apache.tinkerpop.gremlin.process.computer.util.ComputerGraph; -import org.apache.tinkerpop.gremlin.process.traversal.Path; import org.apache.tinkerpop.gremlin.process.traversal.step.util.ImmutablePath; import org.apache.tinkerpop.gremlin.process.traversal.step.util.MutablePath; -import org.apache.tinkerpop.gremlin.process.traversal.traverser.B_LP_O_P_S_SE_SL_Traverser; -import org.apache.tinkerpop.gremlin.process.traversal.traverser.B_LP_O_S_SE_SL_Traverser; -import org.apache.tinkerpop.gremlin.process.traversal.traverser.B_O_S_SE_SL_Traverser; -import org.apache.tinkerpop.gremlin.process.traversal.traverser.B_O_Traverser; -import org.apache.tinkerpop.gremlin.process.traversal.traverser.LP_O_OB_P_S_SE_SL_Traverser; -import org.apache.tinkerpop.gremlin.process.traversal.traverser.LP_O_OB_S_SE_SL_Traverser; -import org.apache.tinkerpop.gremlin.process.traversal.traverser.O_OB_S_SE_SL_Traverser; -import org.apache.tinkerpop.gremlin.process.traversal.traverser.O_Traverser; import org.apache.tinkerpop.gremlin.spark.process.computer.payload.MessagePayload; import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewIncomingPayload; import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewOutgoingPayload; import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewPayload; import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.kryoshim.unshaded.UnshadedSerializerAdapter; -import org.apache.tinkerpop.gremlin.structure.Edge; -import org.apache.tinkerpop.gremlin.structure.Property; -import org.apache.tinkerpop.gremlin.structure.Vertex; -import org.apache.tinkerpop.gremlin.structure.VertexProperty; import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoMapper; import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoSerializers; import org.apache.tinkerpop.gremlin.structure.io.gryo.TypeRegistration; import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.SerializerShim; import org.apache.tinkerpop.gremlin.structure.util.star.StarGraph; -import org.apache.tinkerpop.gremlin.structure.util.star.StarGraphSerializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.collection.mutable.WrappedArray; -import java.util.*; +import java.util.Collections; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Set; /** * A spark.kryo.registrator implementation that installs TinkerPop types. @@ -205,8 +193,6 @@ public class GryoRegistrator implements KryoRegistrator { m.put(VertexWritable.class, new UnshadedSerializerAdapter<>(new VertexWritableSerializer())); m.put(ObjectWritable.class, new UnshadedSerializerAdapter<>(new ObjectWritableSerializer<>())); // - m.put(HadoopConfiguration.class, null); - // m.put(HadoopVertex.class, new UnshadedSerializerAdapter<>(new GryoSerializers.VertexSerializer())); m.put(HadoopVertexProperty.class, new UnshadedSerializerAdapter<>(new GryoSerializers.VertexPropertySerializer())); m.put(HadoopProperty.class, new UnshadedSerializerAdapter<>(new GryoSerializers.PropertySerializer()));