http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/218d7909/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/star/StarGraphSerializer.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/star/StarGraphSerializer.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/star/StarGraphSerializer.java index d5ba90d..431e1eb 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/star/StarGraphSerializer.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/star/StarGraphSerializer.java @@ -27,11 +27,21 @@ import org.apache.tinkerpop.gremlin.structure.Direction; import org.apache.tinkerpop.gremlin.structure.Edge; import org.apache.tinkerpop.gremlin.structure.T; import org.apache.tinkerpop.gremlin.structure.VertexProperty; -import org.apache.tinkerpop.gremlin.structure.io.kryoshim.InputShim; -import org.apache.tinkerpop.gremlin.structure.io.kryoshim.KryoShim; -import org.apache.tinkerpop.gremlin.structure.io.kryoshim.OutputShim; -import org.apache.tinkerpop.gremlin.structure.io.kryoshim.SerializerShim; +import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.InputShim; +import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShim; +import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.OutputShim; +import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.SerializerShim; +/** + * Kryo serializer for {@link StarGraph}. Implements an internal versioning capability for backward compatibility. + * The single byte at the front of the serialization stream denotes the version. That version can be used to choose + * the correct deserialization mechanism. The limitation is that this versioning won't help with backward + * compatibility for custom serializers from providers. Providers should be encouraged to write their serializers + * with backward compatibility in mind. + * + * @author Marko A. Rodriguez (http://markorodriguez.com) + * @author Stephen Mallette (http://stephen.genoprime.com) + */ public class StarGraphSerializer implements SerializerShim<StarGraph> { private final Direction edgeDirectionToSerialize;
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/218d7909/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPoolShimService.java ---------------------------------------------------------------------- diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPoolShimService.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPoolShimService.java index 2053280..c19b914 100644 --- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPoolShimService.java +++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPoolShimService.java @@ -18,7 +18,7 @@ */ package org.apache.tinkerpop.gremlin.hadoop.structure.io; -import org.apache.tinkerpop.gremlin.structure.io.kryoshim.KryoShimService; +import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimService; import org.apache.tinkerpop.shaded.kryo.Kryo; import org.apache.tinkerpop.shaded.kryo.io.Input; import org.apache.tinkerpop.shaded.kryo.io.Output; http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/218d7909/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/ObjectWritable.java ---------------------------------------------------------------------- diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/ObjectWritable.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/ObjectWritable.java index e7a38a5..88f7ee1 100644 --- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/ObjectWritable.java +++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/ObjectWritable.java @@ -21,11 +21,9 @@ package org.apache.tinkerpop.gremlin.hadoop.structure.io; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableUtils; import org.apache.tinkerpop.gremlin.process.computer.MapReduce; -import org.apache.tinkerpop.gremlin.structure.io.kryoshim.KryoShimServiceLoader; -import org.apache.tinkerpop.shaded.kryo.io.Output; +import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimServiceLoader; import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/218d7909/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/VertexWritable.java ---------------------------------------------------------------------- diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/VertexWritable.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/VertexWritable.java index 7ac8e8c..2252ded 100644 --- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/VertexWritable.java +++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/VertexWritable.java @@ -21,12 +21,11 @@ package org.apache.tinkerpop.gremlin.hadoop.structure.io; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; import org.apache.tinkerpop.gremlin.structure.Vertex; -import org.apache.tinkerpop.gremlin.structure.io.kryoshim.KryoShimServiceLoader; +import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimServiceLoader; import org.apache.tinkerpop.gremlin.structure.util.ElementHelper; import org.apache.tinkerpop.gremlin.structure.util.star.StarGraph; import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/218d7909/hadoop-gremlin/src/main/resources/META-INF/services/org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimService ---------------------------------------------------------------------- diff --git a/hadoop-gremlin/src/main/resources/META-INF/services/org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimService b/hadoop-gremlin/src/main/resources/META-INF/services/org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimService new file mode 100644 index 0000000..0b27e72 --- /dev/null +++ b/hadoop-gremlin/src/main/resources/META-INF/services/org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimService @@ -0,0 +1 @@ +org.apache.tinkerpop.gremlin.hadoop.structure.io.HadoopPoolShimService # HadoopPools provides/caches instances of TinkerPop's shaded Kryo http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/218d7909/hadoop-gremlin/src/main/resources/META-INF/services/org.apache.tinkerpop.gremlin.structure.io.kryoshim.KryoShimService ---------------------------------------------------------------------- diff --git a/hadoop-gremlin/src/main/resources/META-INF/services/org.apache.tinkerpop.gremlin.structure.io.kryoshim.KryoShimService b/hadoop-gremlin/src/main/resources/META-INF/services/org.apache.tinkerpop.gremlin.structure.io.kryoshim.KryoShimService deleted file mode 100644 index 0b27e72..0000000 --- a/hadoop-gremlin/src/main/resources/META-INF/services/org.apache.tinkerpop.gremlin.structure.io.kryoshim.KryoShimService +++ /dev/null @@ -1 +0,0 @@ -org.apache.tinkerpop.gremlin.hadoop.structure.io.HadoopPoolShimService # HadoopPools provides/caches instances of TinkerPop's shaded Kryo http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/218d7909/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/TinkerPopKryoRegistrator.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/TinkerPopKryoRegistrator.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/TinkerPopKryoRegistrator.java new file mode 100644 index 0000000..4c99e70 --- /dev/null +++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/TinkerPopKryoRegistrator.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.gremlin.spark.structure.io; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.Serializer; +import com.esotericsoftware.kryo.serializers.JavaSerializer; +import com.google.common.base.Preconditions; +import org.apache.spark.serializer.KryoRegistrator; +import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable; +import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable; +import org.apache.tinkerpop.gremlin.process.traversal.step.map.GroupStep; +import org.apache.tinkerpop.gremlin.process.traversal.step.map.OrderGlobalStep; +import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalExplanation; +import org.apache.tinkerpop.gremlin.spark.process.computer.payload.MessagePayload; +import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewIncomingPayload; +import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewOutgoingPayload; +import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewPayload; +import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.ObjectWritableSerializer; +import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.VertexWritableSerializer; +import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.kryoshim.unshaded.UnshadedSerializerAdapter; +import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoMapper; +import org.apache.tinkerpop.gremlin.structure.io.gryo.TypeRegistration; +import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.SerializerShim; +import org.apache.tinkerpop.gremlin.structure.util.star.StarGraph; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +/** + * A spark.kryo.registrator implementation that installs TinkerPop types. + * This is intended for use with spark.serializer=KryoSerializer, not GryoSerializer. + */ +public class TinkerPopKryoRegistrator implements KryoRegistrator { + + private static final Logger log = LoggerFactory.getLogger(TinkerPopKryoRegistrator.class); + + @Override + public void registerClasses(Kryo kryo) { + // TinkerPop type registrations copied from GyroSerializer's constructor + kryo.register(MessagePayload.class); + kryo.register(ViewIncomingPayload.class); + kryo.register(ViewOutgoingPayload.class); + kryo.register(ViewPayload.class); + kryo.register(VertexWritable.class, new UnshadedSerializerAdapter<>(new VertexWritableSerializer())); + kryo.register(ObjectWritable.class, new UnshadedSerializerAdapter<>(new ObjectWritableSerializer<>())); + + Set<Class<?>> shimmedClasses = new HashSet<>(); + + Set<Class<?>> javaSerializationClasses = new HashSet<>(); + + // Copy GryoMapper's default registrations + for (TypeRegistration<?> tr : GryoMapper.build().create().getTypeRegistrations()) { + // Special case for JavaSerializer, which is generally implemented in terms of TinkerPop's + // problematic static GryoMapper/GryoSerializer pool (these are handled below the loop) + org.apache.tinkerpop.shaded.kryo.Serializer<?> shadedSerializer = tr.getShadedSerializer(); + SerializerShim<?> serializerShim = tr.getSerializerShim(); + if (null != shadedSerializer && + shadedSerializer.getClass().equals(org.apache.tinkerpop.shaded.kryo.serializers.JavaSerializer.class)) { + javaSerializationClasses.add(tr.getTargetClass()); + } else if (null != serializerShim) { + log.debug("Registering class {} to serializer shim {} (serializer shim class {})", + tr.getTargetClass(), serializerShim, serializerShim.getClass()); + kryo.register(tr.getTargetClass(), new UnshadedSerializerAdapter<>(serializerShim)); + shimmedClasses.add(tr.getTargetClass()); + } else { + // Register with the default behavior (FieldSerializer) + log.debug("Registering class {} with default serializer", tr.getTargetClass()); + kryo.register(tr.getTargetClass()); + } + } + + Map<Class<?>, Serializer<?>> javaSerializerReplacements = new HashMap<>(); + javaSerializerReplacements.put(GroupStep.GroupBiOperator.class, new JavaSerializer()); + javaSerializerReplacements.put(OrderGlobalStep.OrderBiOperator.class, null); + javaSerializerReplacements.put(TraversalExplanation.class, null); + + for (Map.Entry<Class<?>, Serializer<?>> e : javaSerializerReplacements.entrySet()) { + Class<?> c = e.getKey(); + Serializer<?> s = e.getValue(); + + if (javaSerializationClasses.remove(c)) { + if (null != s) { + log.debug("Registering class {} with serializer {}", c, s); + kryo.register(c, s); + } else { + log.debug("Registering class {} with default serializer", c); + kryo.register(c); + } + } else { + log.debug("Registering class {} with JavaSerializer", c); + kryo.register(c, new JavaSerializer()); + } + } + + // We really care about StarGraph's shim serializer, so make sure we registered it + if (!shimmedClasses.contains(StarGraph.class)) { + log.warn("No SerializerShim found for StarGraph"); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/218d7909/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/ObjectWritableSerializer.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/ObjectWritableSerializer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/ObjectWritableSerializer.java index 21cbc60..4ceb045 100644 --- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/ObjectWritableSerializer.java +++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/ObjectWritableSerializer.java @@ -20,6 +20,10 @@ package org.apache.tinkerpop.gremlin.spark.structure.io.gryo; import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable; +import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.InputShim; +import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShim; +import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.OutputShim; +import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.SerializerShim; import org.apache.tinkerpop.shaded.kryo.Kryo; import org.apache.tinkerpop.shaded.kryo.Serializer; import org.apache.tinkerpop.shaded.kryo.io.Input; @@ -28,16 +32,16 @@ import org.apache.tinkerpop.shaded.kryo.io.Output; /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public final class ObjectWritableSerializer<T> extends Serializer<ObjectWritable<T>> { +public final class ObjectWritableSerializer<T> implements SerializerShim<ObjectWritable<T>> { @Override - public void write(final Kryo kryo, final Output output, final ObjectWritable<T> objectWritable) { - kryo.writeClassAndObject(output, objectWritable.get()); + public <O extends OutputShim> void write(KryoShim<?, O> kryo, O output, ObjectWritable<T> starGraph) { + kryo.writeClassAndObject(output, starGraph.get()); output.flush(); } @Override - public ObjectWritable<T> read(final Kryo kryo, final Input input, final Class<ObjectWritable<T>> clazz) { + public <I extends InputShim> ObjectWritable<T> read(KryoShim<I, ?> kryo, I input, Class<ObjectWritable<T>> clazz) { return new ObjectWritable(kryo.readClassAndObject(input)); } } http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/218d7909/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/VertexWritableSerializer.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/VertexWritableSerializer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/VertexWritableSerializer.java index 97891f3..f3c1b15 100644 --- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/VertexWritableSerializer.java +++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/VertexWritableSerializer.java @@ -20,6 +20,10 @@ package org.apache.tinkerpop.gremlin.spark.structure.io.gryo; import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable; +import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.InputShim; +import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShim; +import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.OutputShim; +import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.SerializerShim; import org.apache.tinkerpop.gremlin.structure.util.star.StarGraph; import org.apache.tinkerpop.shaded.kryo.Kryo; import org.apache.tinkerpop.shaded.kryo.Serializer; @@ -29,14 +33,16 @@ import org.apache.tinkerpop.shaded.kryo.io.Output; /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public final class VertexWritableSerializer extends Serializer<VertexWritable> { +public final class VertexWritableSerializer implements SerializerShim<VertexWritable> { + @Override - public void write(final Kryo kryo, final Output output, final VertexWritable vertexWritable) { + public <O extends OutputShim> void write(KryoShim<?, O> kryo, O output, VertexWritable vertexWritable) { kryo.writeObject(output, vertexWritable.get().graph()); + output.flush(); } @Override - public VertexWritable read(final Kryo kryo, final Input input, final Class<VertexWritable> aClass) { + public <I extends InputShim> VertexWritable read(KryoShim<I, ?> kryo, I input, Class<VertexWritable> clazz) { return new VertexWritable(kryo.readObject(input, StarGraph.class).getStarVertex()); } } http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/218d7909/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedInputAdapter.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedInputAdapter.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedInputAdapter.java new file mode 100644 index 0000000..c533af7 --- /dev/null +++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedInputAdapter.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * Copyright DataStax, Inc. + * + * Please see the included license file for details. + */ +package org.apache.tinkerpop.gremlin.spark.structure.io.gryo.kryoshim.unshaded; + +import com.esotericsoftware.kryo.io.Input; +import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.InputShim; + +public class UnshadedInputAdapter implements InputShim +{ + + private final Input unshadedInput; + + public UnshadedInputAdapter(Input unshadedInput) + { + this.unshadedInput = unshadedInput; + } + + Input getUnshadedInput() + { + return unshadedInput; + } + + @Override + public byte readByte() + { + return unshadedInput.readByte(); + } + + @Override + public byte[] readBytes(int size) { + return unshadedInput.readBytes(size); + } + + @Override + public String readString() + { + return unshadedInput.readString(); + } + + @Override + public long readLong() + { + return unshadedInput.readLong(); + } + + @Override + public int readInt() { + return unshadedInput.readInt(); + } + + @Override + public double readDouble() + { + return unshadedInput.readDouble(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/218d7909/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedKryoAdapter.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedKryoAdapter.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedKryoAdapter.java new file mode 100644 index 0000000..b14abe0 --- /dev/null +++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedKryoAdapter.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * Copyright DataStax, Inc. + * + * Please see the included license file for details. + */ +package org.apache.tinkerpop.gremlin.spark.structure.io.gryo.kryoshim.unshaded; + +import com.esotericsoftware.kryo.Kryo; +import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShim; + +public class UnshadedKryoAdapter implements KryoShim<UnshadedInputAdapter, UnshadedOutputAdapter> +{ + private final Kryo unshadedKryo; + + public UnshadedKryoAdapter(Kryo unshadedKryo) + { + this.unshadedKryo = unshadedKryo; + } + + @Override + public <T> T readObject(UnshadedInputAdapter input, Class<T> type) + { + return unshadedKryo.readObject(input.getUnshadedInput(), type); + } + + @Override + public Object readClassAndObject(UnshadedInputAdapter input) + { + return unshadedKryo.readClassAndObject(input.getUnshadedInput()); + } + + @Override + public void writeObject(UnshadedOutputAdapter output, Object object) + { + unshadedKryo.writeObject(output.getUnshadedOutput(), object); + } + + @Override + public void writeClassAndObject(UnshadedOutputAdapter output, Object object) + { + unshadedKryo.writeClassAndObject(output.getUnshadedOutput(), object); + } + + @Override + public <T> T readObjectOrNull(UnshadedInputAdapter input, Class<T> type) + { + return unshadedKryo.readObjectOrNull(input.getUnshadedInput(), type); + } + + @Override + public void writeObjectOrNull(UnshadedOutputAdapter output, Object object, Class type) + { + unshadedKryo.writeObjectOrNull(output.getUnshadedOutput(), object, type); + } +} http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/218d7909/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedKryoShimService.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedKryoShimService.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedKryoShimService.java new file mode 100644 index 0000000..d0411e8 --- /dev/null +++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedKryoShimService.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * Copyright DataStax, Inc. + * <p> + * Please see the included license file for details. + */ +package org.apache.tinkerpop.gremlin.spark.structure.io.gryo.kryoshim.unshaded; + +import com.twitter.chill.KryoInstantiator; +import com.twitter.chill.KryoPool; +import com.twitter.chill.SerDeState; +import org.apache.spark.SparkConf; +import org.apache.spark.serializer.KryoSerializer; +import org.apache.tinkerpop.gremlin.spark.structure.io.TinkerPopKryoRegistrator; +import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +public class UnshadedKryoShimService implements KryoShimService { + + public static final String SPARK_KRYO_POOL_SIZE_SYSTEM_PROPERTY = "tinkerpop.kryo.poolsize"; + + private static final Logger log = LoggerFactory.getLogger(UnshadedKryoShimService.class); + private static final int SPARK_KRYO_POOL_SIZE_DEFAULT = 8; + + private final KryoSerializer sparkKryoSerializer; + private final KryoPool kryoPool; + + public UnshadedKryoShimService() { + this(TinkerPopKryoRegistrator.class.getCanonicalName(), getDefaultKryoPoolSize()); + } + + public UnshadedKryoShimService(String sparkKryoRegistratorClassname, int kryoPoolSize) { + SparkConf sparkConf = new SparkConf(); + sparkConf.set("spark.serializer", KryoSerializer.class.getCanonicalName()); + sparkConf.set("spark.kryo.registrator", sparkKryoRegistratorClassname); + sparkKryoSerializer = new KryoSerializer(sparkConf); + kryoPool = KryoPool.withByteArrayOutputStream(kryoPoolSize, new KryoInstantiator()); + } + + @Override + public Object readClassAndObject(InputStream source) { + SerDeState sds = null; + try { + sds = kryoPool.borrow(); + + sds.setInput(source); + + return sds.readClassAndObject(); + } finally { + kryoPool.release(sds); + } + } + + @Override + public void writeClassAndObject(Object o, OutputStream sink) { + SerDeState sds = null; + try { + sds = kryoPool.borrow(); + + sds.writeClassAndObject(o); // this writes to an internal buffer + + sds.writeOutputTo(sink); // this copies the internal buffer to sink + + sink.flush(); + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + kryoPool.release(sds); + } + } + + @Override + public int getPriority() { + return 1024; + } + + private static int getDefaultKryoPoolSize() { + String raw = System.getProperty(SPARK_KRYO_POOL_SIZE_SYSTEM_PROPERTY); + + int size = SPARK_KRYO_POOL_SIZE_DEFAULT; + try { + size = Integer.valueOf(raw); + log.info("Setting kryo pool size to {} according to system property {}", size, + SPARK_KRYO_POOL_SIZE_SYSTEM_PROPERTY); + } catch (NumberFormatException e) { + log.error("System property {}={} could not be parsed as an integer, using default value {}", + SPARK_KRYO_POOL_SIZE_SYSTEM_PROPERTY, raw, size, e); + } + + return size; + } +} http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/218d7909/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedOutputAdapter.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedOutputAdapter.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedOutputAdapter.java new file mode 100644 index 0000000..9cc59d4 --- /dev/null +++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedOutputAdapter.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * Copyright DataStax, Inc. + * + * Please see the included license file for details. + */ +package org.apache.tinkerpop.gremlin.spark.structure.io.gryo.kryoshim.unshaded; + +import com.esotericsoftware.kryo.io.Output; +import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.OutputShim; + +public class UnshadedOutputAdapter implements OutputShim +{ + private final Output unshadedOutput; + + public UnshadedOutputAdapter(Output unshadedOutput) + { + this.unshadedOutput = unshadedOutput; + } + + Output getUnshadedOutput() + { + return unshadedOutput; + } + + @Override + public void writeByte(byte b) + { + unshadedOutput.writeByte(b); + } + + @Override + public void writeBytes(byte[] array, int offset, int count) { + unshadedOutput.writeBytes(array, offset, count); + } + + @Override + public void writeString(String s) + { + unshadedOutput.writeString(s); + } + + @Override + public void writeLong(long l) + { + unshadedOutput.writeLong(l); + } + + @Override + public void writeInt(int i) { + unshadedOutput.writeInt(i); + } + + @Override + public void writeDouble(double d) + { + unshadedOutput.writeDouble(d); + } + + @Override + public void flush() + { + unshadedOutput.flush(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/218d7909/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedSerializerAdapter.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedSerializerAdapter.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedSerializerAdapter.java new file mode 100644 index 0000000..efc9a4f --- /dev/null +++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedSerializerAdapter.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * Copyright DataStax, Inc. + * + * Please see the included license file for details. + */ +package org.apache.tinkerpop.gremlin.spark.structure.io.gryo.kryoshim.unshaded; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.Serializer; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.SerializerShim; + +public class UnshadedSerializerAdapter<T> extends Serializer<T> +{ + + SerializerShim<T> serializer; + + public UnshadedSerializerAdapter(SerializerShim<T> serializer) { + this.serializer = serializer; + setImmutable(this.serializer.isImmutable()); + } + + @Override + public void write(Kryo kryo, Output output, T t) { + UnshadedKryoAdapter shadedKryoAdapter = new UnshadedKryoAdapter(kryo); + UnshadedOutputAdapter shadedOutputAdapter = new UnshadedOutputAdapter(output); + serializer.write(shadedKryoAdapter, shadedOutputAdapter, t); + } + + @Override + public T read(Kryo kryo, Input input, Class<T> aClass) + { + UnshadedKryoAdapter shadedKryoAdapter = new UnshadedKryoAdapter(kryo); + UnshadedInputAdapter shadedInputAdapter = new UnshadedInputAdapter(input); + return serializer.read(shadedKryoAdapter, shadedInputAdapter, aClass); + } +}