Kryo shim configuration tweaks
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/9321a3e1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/9321a3e1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/9321a3e1 Branch: refs/heads/TINKERPOP-1321 Commit: 9321a3e14eab4ed05f8ef5f4c77de481a4011b81 Parents: 218d790 Author: Dan LaRocque <dal...@hopcount.org> Authored: Mon Jun 6 02:24:12 2016 -0400 Committer: Dan LaRocque <dal...@hopcount.org> Committed: Mon Jun 6 03:10:03 2016 -0400 ---------------------------------------------------------------------- .../process/computer/GiraphWorkerContext.java | 3 +- .../gremlin/structure/io/gryo/GryoMapper.java | 30 ++- .../gremlin/structure/io/gryo/GryoPool.java | 1 + .../structure/io/gryo/GryoSerializers.java | 40 ++-- .../structure/io/gryo/JavaTimeSerializers.java | 127 +++++------- .../structure/io/gryo/PairSerializer.java | 11 +- .../structure/io/gryo/TypeRegistration.java | 12 ++ .../io/gryo/kryoshim/KryoShimService.java | 16 ++ .../io/gryo/kryoshim/KryoShimServiceLoader.java | 23 ++- .../io/gryo/kryoshim/SerializerShim.java | 2 +- .../hadoop/process/computer/HadoopCombine.java | 3 +- .../hadoop/process/computer/HadoopMap.java | 3 +- .../hadoop/process/computer/HadoopReduce.java | 3 +- .../structure/io/HadoopPoolShimService.java | 7 + .../structure/io/HadoopPoolsConfigurable.java | 4 +- .../structure/io/gryo/GryoRecordReader.java | 3 +- .../structure/io/gryo/GryoRecordWriter.java | 4 +- .../spark/process/computer/SparkExecutor.java | 3 +- .../structure/io/TinkerPopKryoRegistrator.java | 121 ------------ .../spark/structure/io/gryo/GryoSerializer.java | 2 +- .../io/gryo/IoRegistryAwareKryoSerializer.java | 116 +++++++++++ .../io/gryo/TinkerPopKryoRegistrator.java | 194 +++++++++++++++++++ .../unshaded/UnshadedKryoShimService.java | 131 ++++++++----- ...n.structure.io.gryo.kryoshim.KryoShimService | 1 + .../spark/structure/io/ToyGraphInputRDD.java | 3 +- 25 files changed, 572 insertions(+), 291 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9321a3e1/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphWorkerContext.java ---------------------------------------------------------------------- diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphWorkerContext.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphWorkerContext.java index 86b733c..0122ab4 100644 --- a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphWorkerContext.java +++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphWorkerContext.java @@ -28,6 +28,7 @@ import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil; import org.apache.tinkerpop.gremlin.process.computer.VertexProgram; import org.apache.tinkerpop.gremlin.process.computer.util.ImmutableMemory; import org.apache.tinkerpop.gremlin.process.computer.util.VertexProgramPool; +import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimServiceLoader; import java.util.Iterator; @@ -45,7 +46,7 @@ public final class GiraphWorkerContext extends WorkerContext { public void preApplication() throws InstantiationException, IllegalAccessException { final Configuration apacheConfiguration = ConfUtil.makeApacheConfiguration(this.getContext().getConfiguration()); - HadoopPools.initialize(apacheConfiguration); + KryoShimServiceLoader.applyConfiguration(apacheConfiguration); final VertexProgram vertexProgram = VertexProgram.createVertexProgram(HadoopGraph.open(apacheConfiguration), apacheConfiguration); this.vertexProgramPool = new VertexProgramPool(vertexProgram, this.getContext().getConfiguration().getInt(GiraphConstants.NUM_COMPUTE_THREADS.getKey(), 1)); this.memory = new GiraphMemory(this, vertexProgram); http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9321a3e1/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoMapper.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoMapper.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoMapper.java index 851b03c..41ca44d 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoMapper.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoMapper.java @@ -18,6 +18,7 @@ */ package org.apache.tinkerpop.gremlin.structure.io.gryo; +import org.apache.commons.lang.builder.ToStringBuilder; import org.apache.tinkerpop.gremlin.process.computer.GraphFilter; import org.apache.tinkerpop.gremlin.process.computer.MapReduce; import org.apache.tinkerpop.gremlin.process.computer.util.MapMemory; @@ -369,6 +370,20 @@ public final class GryoMapper implements Mapper<Kryo> { private Supplier<ClassResolver> classResolver = GryoClassResolver::new; private Builder() { + // Validate the default registrations + // For justification of these default registration rules, see TinkerPopKryoRegistrator + for (TypeRegistration<?> tr : typeRegistrations) { + if (tr.hasSerializer() /* no serializer is acceptable */ && + null == tr.getSerializerShim() /* a shim serializer is acceptable */ && + !(tr.getShadedSerializer() instanceof JavaSerializer) /* shaded JavaSerializer is acceptable */) { + // everything else is invalid + String msg = String.format("The default GryoMapper type registration %s is invalid. " + + "It must supply either an implementation of %s or %s, but supplies neither. " + + "This is probably a bug in GryoMapper's default serialization class registrations.", tr, + SerializerShim.class.getCanonicalName(), JavaSerializer.class.getCanonicalName()); + throw new IllegalStateException(msg); + } + } } /** @@ -538,8 +553,8 @@ public final class GryoMapper implements Mapper<Kryo> { if (1 < serializerCount) { String msg = String.format( "GryoTypeReg accepts at most one kind of serializer, but multiple " + - "serializers were supplied for class %s (id %s). " + - "Shaded serializer: %s. Shim serializer: %s. Shaded serializer function: %s.", + "serializers were supplied for class %s (id %s). " + + "Shaded serializer: %s. Shim serializer: %s. Shaded serializer function: %s.", this.clazz.getCanonicalName(), id, this.shadedSerializer, this.serializerShim, this.functionOfShadedKryo); throw new IllegalArgumentException(msg); @@ -603,5 +618,16 @@ public final class GryoMapper implements Mapper<Kryo> { return kryo; } + + @Override + public String toString() { + return new ToStringBuilder(this) + .append("targetClass", clazz) + .append("id", id) + .append("shadedSerializer", shadedSerializer) + .append("serializerShim", serializerShim) + .append("functionOfShadedKryo", functionOfShadedKryo) + .toString(); + } } } http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9321a3e1/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoPool.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoPool.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoPool.java index e7bf636..59f8a5d 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoPool.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoPool.java @@ -40,6 +40,7 @@ import java.util.function.Function; public final class GryoPool { public static final String CONFIG_IO_REGISTRY = "gremlin.io.registry"; public static final String CONFIG_IO_GRYO_POOL_SIZE = "gremlin.io.gryo.poolSize"; + public static final int CONFIG_IO_GRYO_POOL_SIZE_DEFAULT = 256; public enum Type {READER, WRITER, READER_WRITER} http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9321a3e1/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoSerializers.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoSerializers.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoSerializers.java index ae99ac6..16fbe85 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoSerializers.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/GryoSerializers.java @@ -23,16 +23,16 @@ import org.apache.tinkerpop.gremlin.structure.Edge; import org.apache.tinkerpop.gremlin.structure.Property; import org.apache.tinkerpop.gremlin.structure.Vertex; import org.apache.tinkerpop.gremlin.structure.VertexProperty; +import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.InputShim; +import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShim; +import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.OutputShim; +import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.SerializerShim; import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedEdge; import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedFactory; import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedPath; import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedProperty; import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertex; import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertexProperty; -import org.apache.tinkerpop.shaded.kryo.Kryo; -import org.apache.tinkerpop.shaded.kryo.Serializer; -import org.apache.tinkerpop.shaded.kryo.io.Input; -import org.apache.tinkerpop.shaded.kryo.io.Output; /** * Class used to serialize graph-based objects such as vertices, edges, properties, and paths. These objects are @@ -42,19 +42,19 @@ import org.apache.tinkerpop.shaded.kryo.io.Output; * @author Stephen Mallette (http://stephen.genoprime.com) * @author Marko A. Rodriguez (http://markorodriguez.com) */ -final class GryoSerializers { +public final class GryoSerializers { /** * Serializes any {@link Edge} implementation encountered to a {@link DetachedEdge}. */ - final static class EdgeSerializer extends Serializer<Edge> { + final static class EdgeSerializer implements SerializerShim<Edge> { @Override - public void write(final Kryo kryo, final Output output, final Edge edge) { + public <O extends OutputShim> void write(KryoShim<?, O> kryo, O output, Edge edge) { kryo.writeClassAndObject(output, DetachedFactory.detach(edge, true)); } @Override - public Edge read(final Kryo kryo, final Input input, final Class<Edge> edgeClass) { + public <I extends InputShim> Edge read(KryoShim<I, ?> kryo, I input, Class<Edge> edgeClass) { final Object o = kryo.readClassAndObject(input); return (Edge) o; } @@ -63,14 +63,14 @@ final class GryoSerializers { /** * Serializes any {@link Vertex} implementation encountered to an {@link DetachedVertex}. */ - final static class VertexSerializer extends Serializer<Vertex> { + final static class VertexSerializer implements SerializerShim<Vertex> { @Override - public void write(final Kryo kryo, final Output output, final Vertex vertex) { + public <O extends OutputShim> void write(KryoShim<?, O> kryo, O output, Vertex vertex) { kryo.writeClassAndObject(output, DetachedFactory.detach(vertex, true)); } @Override - public Vertex read(final Kryo kryo, final Input input, final Class<Vertex> vertexClass) { + public <I extends InputShim> Vertex read(KryoShim<I, ?> kryo, I input, Class<Vertex> vertexClass) { return (Vertex) kryo.readClassAndObject(input); } } @@ -78,14 +78,14 @@ final class GryoSerializers { /** * Serializes any {@link Property} implementation encountered to an {@link DetachedProperty}. */ - final static class PropertySerializer extends Serializer<Property> { + final static class PropertySerializer implements SerializerShim<Property> { @Override - public void write(final Kryo kryo, final Output output, final Property property) { + public <O extends OutputShim> void write(KryoShim<?, O> kryo, O output, Property property) { kryo.writeClassAndObject(output, property instanceof VertexProperty ? DetachedFactory.detach((VertexProperty) property, true) : DetachedFactory.detach(property)); } @Override - public Property read(final Kryo kryo, final Input input, final Class<Property> propertyClass) { + public <I extends InputShim> Property read(KryoShim<I, ?> kryo, I input, Class<Property> propertyClass) { return (Property) kryo.readClassAndObject(input); } } @@ -93,14 +93,14 @@ final class GryoSerializers { /** * Serializes any {@link VertexProperty} implementation encountered to an {@link DetachedVertexProperty}. */ - final static class VertexPropertySerializer extends Serializer<VertexProperty> { + final static class VertexPropertySerializer implements SerializerShim<VertexProperty> { @Override - public void write(final Kryo kryo, final Output output, final VertexProperty vertexProperty) { + public <O extends OutputShim> void write(KryoShim<?, O> kryo, O output, VertexProperty vertexProperty) { kryo.writeClassAndObject(output, DetachedFactory.detach(vertexProperty, true)); } @Override - public VertexProperty read(final Kryo kryo, final Input input, final Class<VertexProperty> vertexPropertyClass) { + public <I extends InputShim> VertexProperty read(KryoShim<I, ?> kryo, I input, Class<VertexProperty> vertexPropertyClass) { return (VertexProperty) kryo.readClassAndObject(input); } } @@ -108,14 +108,14 @@ final class GryoSerializers { /** * Serializes any {@link Path} implementation encountered to an {@link DetachedPath}. */ - final static class PathSerializer extends Serializer<Path> { + public final static class PathSerializer implements SerializerShim<Path> { @Override - public void write(final Kryo kryo, final Output output, final Path path) { + public <O extends OutputShim> void write(KryoShim<?, O> kryo, O output, Path path) { kryo.writeClassAndObject(output, DetachedFactory.detach(path, false)); } @Override - public Path read(final Kryo kryo, final Input input, final Class<Path> pathClass) { + public <I extends InputShim> Path read(KryoShim<I, ?> kryo, I input, Class<Path> pathClass) { return (Path) kryo.readClassAndObject(input); } } http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9321a3e1/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/JavaTimeSerializers.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/JavaTimeSerializers.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/JavaTimeSerializers.java index 1d4e236..8b14345 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/JavaTimeSerializers.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/JavaTimeSerializers.java @@ -18,10 +18,10 @@ */ package org.apache.tinkerpop.gremlin.structure.io.gryo; -import org.apache.tinkerpop.shaded.kryo.Kryo; -import org.apache.tinkerpop.shaded.kryo.Serializer; -import org.apache.tinkerpop.shaded.kryo.io.Input; -import org.apache.tinkerpop.shaded.kryo.io.Output; +import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.InputShim; +import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShim; +import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.OutputShim; +import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.SerializerShim; import java.time.Duration; import java.time.Instant; @@ -48,17 +48,14 @@ final class JavaTimeSerializers { /** * Serializer for the {@link Duration} class. */ - final static class DurationSerializer extends Serializer<Duration> - { + final static class DurationSerializer implements SerializerShim<Duration> { @Override - public void write(final Kryo kryo, final Output output, final Duration duration) - { + public <O extends OutputShim> void write(KryoShim<?, O> kryo, O output, Duration duration) { output.writeLong(duration.toNanos()); } @Override - public Duration read(final Kryo kryo, final Input input, final Class<Duration> durationClass) - { + public <I extends InputShim> Duration read(KryoShim<I, ?> kryo, I input, Class<Duration> durationClass) { return Duration.ofNanos(input.readLong()); } } @@ -66,18 +63,15 @@ final class JavaTimeSerializers { /** * Serializer for the {@link Instant} class. */ - final static class InstantSerializer extends Serializer<Instant> - { + final static class InstantSerializer implements SerializerShim<Instant> { @Override - public void write(Kryo kryo, Output output, Instant instant) - { + public <O extends OutputShim> void write(KryoShim<?, O> kryo, O output, Instant instant) { output.writeLong(instant.getEpochSecond()); output.writeInt(instant.getNano()); } @Override - public Instant read(Kryo kryo, Input input, Class<Instant> aClass) - { + public <I extends InputShim> Instant read(KryoShim<I, ?> kryo, I input, Class<Instant> aClass) { return Instant.ofEpochSecond(input.readLong(), input.readInt()); } } @@ -85,17 +79,14 @@ final class JavaTimeSerializers { /** * Serializer for the {@link LocalDate} class. */ - final static class LocalDateSerializer extends Serializer<LocalDate> - { + final static class LocalDateSerializer implements SerializerShim<LocalDate> { @Override - public void write(final Kryo kryo, final Output output, final LocalDate localDate) - { + public <O extends OutputShim> void write(KryoShim<?, O> kryo, O output, LocalDate localDate) { output.writeLong(localDate.toEpochDay()); } @Override - public LocalDate read(final Kryo kryo, final Input input, final Class<LocalDate> clazz) - { + public <I extends InputShim> LocalDate read(KryoShim<I, ?> kryo, I input, Class<LocalDate> clazz) { return LocalDate.ofEpochDay(input.readLong()); } } @@ -103,11 +94,9 @@ final class JavaTimeSerializers { /** * Serializer for the {@link LocalDateTime} class. */ - final static class LocalDateTimeSerializer extends Serializer<LocalDateTime> - { + final static class LocalDateTimeSerializer implements SerializerShim<LocalDateTime> { @Override - public void write(final Kryo kryo, final Output output, final LocalDateTime localDateTime) - { + public <O extends OutputShim> void write(KryoShim<?, O> kryo, O output, LocalDateTime localDateTime) { output.writeInt(localDateTime.getYear()); output.writeInt(localDateTime.getMonthValue()); output.writeInt(localDateTime.getDayOfMonth()); @@ -118,8 +107,7 @@ final class JavaTimeSerializers { } @Override - public LocalDateTime read(final Kryo kryo, final Input input, final Class<LocalDateTime> clazz) - { + public <I extends InputShim> LocalDateTime read(KryoShim<I, ?> kryo, I input, Class<LocalDateTime> clazz) { return LocalDateTime.of(input.readInt(), input.readInt(), input.readInt(), input.readInt(), input.readInt(), input.readInt(), input.readInt()); } } @@ -127,17 +115,14 @@ final class JavaTimeSerializers { /** * Serializer for the {@link LocalTime} class. */ - final static class LocalTimeSerializer extends Serializer<LocalTime> - { + final static class LocalTimeSerializer implements SerializerShim<LocalTime> { @Override - public void write(final Kryo kryo, final Output output, final LocalTime localTime) - { + public <O extends OutputShim> void write(KryoShim<?, O> kryo, O output, LocalTime localTime) { output.writeLong(localTime.toNanoOfDay()); } @Override - public LocalTime read(final Kryo kryo, final Input input, final Class<LocalTime> clazz) - { + public <I extends InputShim> LocalTime read(KryoShim<I, ?> kryo, I input, Class<LocalTime> clazz) { return LocalTime.ofNanoOfDay(input.readLong()); } } @@ -145,37 +130,31 @@ final class JavaTimeSerializers { /** * Serializer for the {@link MonthDay} class. */ - final static class MonthDaySerializer extends Serializer<MonthDay> - { + final static class MonthDaySerializer implements SerializerShim<MonthDay> { @Override - public void write(final Kryo kryo, final Output output, final MonthDay monthDay) - { + public <O extends OutputShim> void write(KryoShim<?, O> kryo, O output, MonthDay monthDay) { output.writeInt(monthDay.getMonthValue()); output.writeInt(monthDay.getDayOfMonth()); } @Override - public MonthDay read(final Kryo kryo, final Input input, final Class<MonthDay> clazz) - { - return MonthDay.of(input.readInt(), input.readInt()); + public <I extends InputShim> MonthDay read(KryoShim<I, ?> kryo, I input, Class<MonthDay> clazz) { + return null; } } /** * Serializer for the {@link OffsetDateTime} class. */ - final static class OffsetDateTimeSerializer extends Serializer<OffsetDateTime> - { + final static class OffsetDateTimeSerializer implements SerializerShim<OffsetDateTime> { @Override - public void write(final Kryo kryo, final Output output, final OffsetDateTime offsetDateTime) - { + public <O extends OutputShim> void write(KryoShim<?, O> kryo, O output, OffsetDateTime offsetDateTime) { kryo.writeObject(output, offsetDateTime.toLocalDateTime()); kryo.writeObject(output, offsetDateTime.getOffset()); } @Override - public OffsetDateTime read(final Kryo kryo, final Input input, final Class<OffsetDateTime> clazz) - { + public <I extends InputShim> OffsetDateTime read(KryoShim<I, ?> kryo, I input, Class<OffsetDateTime> clazz) { return OffsetDateTime.of(kryo.readObject(input, LocalDateTime.class), kryo.readObject(input, ZoneOffset.class)); } } @@ -183,18 +162,15 @@ final class JavaTimeSerializers { /** * Serializer for the {@link OffsetTime} class. */ - final static class OffsetTimeSerializer extends Serializer<OffsetTime> - { + final static class OffsetTimeSerializer implements SerializerShim<OffsetTime> { @Override - public void write(final Kryo kryo, final Output output, final OffsetTime offsetTime) - { + public <O extends OutputShim> void write(KryoShim<?, O> kryo, O output, OffsetTime offsetTime) { kryo.writeObject(output, offsetTime.toLocalTime()); kryo.writeObject(output, offsetTime.getOffset()); } @Override - public OffsetTime read(final Kryo kryo, final Input input, final Class<OffsetTime> clazz) - { + public <I extends InputShim> OffsetTime read(KryoShim<I, ?> kryo, I input, Class<OffsetTime> clazz) { return OffsetTime.of(kryo.readObject(input, LocalTime.class), kryo.readObject(input, ZoneOffset.class)); } } @@ -202,19 +178,16 @@ final class JavaTimeSerializers { /** * Serializer for the {@link Period} class. */ - final static class PeriodSerializer extends Serializer<Period> - { + final static class PeriodSerializer implements SerializerShim<Period> { @Override - public void write(final Kryo kryo, final Output output, final Period period) - { + public <O extends OutputShim> void write(KryoShim<?, O> kryo, O output, Period period) { output.writeInt(period.getYears()); output.writeInt(period.getMonths()); output.writeInt(period.getDays()); } @Override - public Period read(final Kryo kryo, final Input input, final Class<Period> clazz) - { + public <I extends InputShim> Period read(KryoShim<I, ?> kryo, I input, Class<Period> clazz) { return Period.of(input.readInt(), input.readInt(), input.readInt()); } } @@ -222,17 +195,14 @@ final class JavaTimeSerializers { /** * Serializer for the {@link Year} class. */ - final static class YearSerializer extends Serializer<Year> - { + final static class YearSerializer implements SerializerShim<Year> { @Override - public void write(final Kryo kryo, final Output output, final Year year) - { + public <O extends OutputShim> void write(KryoShim<?, O> kryo, O output, Year year) { output.writeInt(year.getValue()); } @Override - public Year read(final Kryo kryo, final Input input, final Class<Year> clazz) - { + public <I extends InputShim> Year read(KryoShim<I, ?> kryo, I input, Class<Year> clazz) { return Year.of(input.readInt()); } } @@ -240,18 +210,15 @@ final class JavaTimeSerializers { /** * Serializer for the {@link YearMonth} class. */ - final static class YearMonthSerializer extends Serializer<YearMonth> - { + final static class YearMonthSerializer implements SerializerShim<YearMonth> { @Override - public void write(final Kryo kryo, final Output output, final YearMonth monthDay) - { + public <O extends OutputShim> void write(KryoShim<?, O> kryo, O output, YearMonth monthDay) { output.writeInt(monthDay.getYear()); output.writeInt(monthDay.getMonthValue()); } @Override - public YearMonth read(final Kryo kryo, final Input input, final Class<YearMonth> clazz) - { + public <I extends InputShim> YearMonth read(KryoShim<I, ?> kryo, I input, Class<YearMonth> clazz) { return YearMonth.of(input.readInt(), input.readInt()); } } @@ -259,11 +226,9 @@ final class JavaTimeSerializers { /** * Serializer for the {@link ZonedDateTime} class. */ - final static class ZonedDateTimeSerializer extends Serializer<ZonedDateTime> - { + final static class ZonedDateTimeSerializer implements SerializerShim<ZonedDateTime> { @Override - public void write(final Kryo kryo, final Output output, final ZonedDateTime zonedDateTime) - { + public <O extends OutputShim> void write(KryoShim<?, O> kryo, O output, ZonedDateTime zonedDateTime) { output.writeInt(zonedDateTime.getYear()); output.writeInt(zonedDateTime.getMonthValue()); output.writeInt(zonedDateTime.getDayOfMonth()); @@ -275,8 +240,7 @@ final class JavaTimeSerializers { } @Override - public ZonedDateTime read(final Kryo kryo, final Input input, final Class<ZonedDateTime> clazz) - { + public <I extends InputShim> ZonedDateTime read(KryoShim<I, ?> kryo, I input, Class<ZonedDateTime> clazz) { return ZonedDateTime.of(input.readInt(), input.readInt(), input.readInt(), input.readInt(), input.readInt(), input.readInt(), input.readInt(), ZoneId.of(input.readString())); @@ -286,17 +250,14 @@ final class JavaTimeSerializers { /** * Serializer for the {@link ZoneOffset} class. */ - final static class ZoneOffsetSerializer extends Serializer<ZoneOffset> - { + final static class ZoneOffsetSerializer implements SerializerShim<ZoneOffset> { @Override - public void write(final Kryo kryo, final Output output, final ZoneOffset zoneOffset) - { + public <O extends OutputShim> void write(KryoShim<?, O> kryo, O output, ZoneOffset zoneOffset) { output.writeString(zoneOffset.getId()); } @Override - public ZoneOffset read(final Kryo kryo, final Input input, final Class<ZoneOffset> clazz) - { + public <I extends InputShim> ZoneOffset read(KryoShim<I, ?> kryo, I input, Class<ZoneOffset> clazz) { return ZoneOffset.of(input.readString()); } } http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9321a3e1/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/PairSerializer.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/PairSerializer.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/PairSerializer.java index e5e92e7..0464b22 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/PairSerializer.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/PairSerializer.java @@ -18,6 +18,10 @@ */ package org.apache.tinkerpop.gremlin.structure.io.gryo; +import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.InputShim; +import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShim; +import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.OutputShim; +import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.SerializerShim; import org.apache.tinkerpop.shaded.kryo.Kryo; import org.apache.tinkerpop.shaded.kryo.Serializer; import org.apache.tinkerpop.shaded.kryo.io.Input; @@ -27,16 +31,15 @@ import org.javatuples.Pair; /** * @author Daniel Kuppitz (http://gremlin.guru) */ -final class PairSerializer extends Serializer<Pair> { - +final class PairSerializer implements SerializerShim<Pair> { @Override - public void write(final Kryo kryo, final Output output, final Pair pair) { + public <O extends OutputShim> void write(KryoShim<?, O> kryo, O output, Pair pair) { kryo.writeClassAndObject(output, pair.getValue0()); kryo.writeClassAndObject(output, pair.getValue1()); } @Override - public Pair read(final Kryo kryo, final Input input, final Class<Pair> pairClass) { + public <I extends InputShim> Pair read(KryoShim<I, ?> kryo, I input, Class<Pair> pairClass) { return Pair.with(kryo.readClassAndObject(input), kryo.readClassAndObject(input)); } } http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9321a3e1/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/TypeRegistration.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/TypeRegistration.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/TypeRegistration.java index ef105ce..1f41c0d 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/TypeRegistration.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/TypeRegistration.java @@ -67,4 +67,16 @@ public interface TypeRegistration<T> { * @return the sole parameter */ Kryo registerWith(Kryo kryo); + + /** + * Returns true if at least one of {@link #getShadedSerializer()}, {@link #getSerializerShim()}, or + * {@link #getFunctionOfShadedKryo()} is non null. Returns false if all are null. + * + * @return whether a serializer is defined for this type registration + */ + default boolean hasSerializer() { + return null != getFunctionOfShadedKryo() || + null != getSerializerShim() || + null != getShadedSerializer(); + } } http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9321a3e1/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimService.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimService.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimService.java index 959605c..7783856 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimService.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimService.java @@ -18,6 +18,8 @@ */ package org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim; +import org.apache.commons.configuration.Configuration; + import java.io.InputStream; import java.io.OutputStream; @@ -80,4 +82,18 @@ public interface KryoShimService { * @return this implementation's priority value */ int getPriority(); + + /** + * Attempt to incorporate the supplied configuration in future read/write calls. + * <p> + * This method is a wart that exists essentially just to support the old + * {@link HadoopPools#initialize(Configuration)} use-case. + * <p> + * This method is not guaranteed to have any effect on an instance of this interface + * after {@link #writeClassAndObject(Object, OutputStream)} or {@link #readClassAndObject(InputStream)} + * has been invoked on that particular instance. + * + * @param conf the configuration to apply to this service's internal serializer + */ + void applyConfiguration(Configuration conf); } http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9321a3e1/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java index 9ccf2de..9184dd0 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java @@ -18,6 +18,7 @@ */ package org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim; +import org.apache.commons.configuration.Configuration; import org.apache.tinkerpop.shaded.kryo.io.Input; import org.apache.tinkerpop.shaded.kryo.io.Output; import org.slf4j.Logger; @@ -35,7 +36,9 @@ import java.util.ServiceLoader; */ public class KryoShimServiceLoader { - private static volatile KryoShimService CACHED_SHIM_SERVICE; + private static volatile KryoShimService cachedShimService; + + private static volatile Configuration conf; private static final Logger log = LoggerFactory.getLogger(KryoShimServiceLoader.class); @@ -46,6 +49,10 @@ public class KryoShimServiceLoader { */ public static final String SHIM_CLASS_SYSTEM_PROPERTY = "tinkerpop.kryo.shim"; + public static void applyConfiguration(Configuration conf) { + KryoShimServiceLoader.conf = conf; + } + /** * Return a reference to the shim service. This method may return a cached shim service * unless {@code forceReload} is true. Calls to this method need not be externally @@ -58,8 +65,8 @@ public class KryoShimServiceLoader { */ public static KryoShimService load(boolean forceReload) { - if (null != CACHED_SHIM_SERVICE && !forceReload) { - return CACHED_SHIM_SERVICE; + if (null != cachedShimService && !forceReload) { + return cachedShimService; } ArrayList<KryoShimService> services = new ArrayList<>(); @@ -109,7 +116,15 @@ public class KryoShimServiceLoader { log.info("Set {} provider to {} ({}) because its priority value ({}) is the highest available", KryoShimService.class.getSimpleName(), result, result.getClass(), result.getPriority()); - return CACHED_SHIM_SERVICE = result; + Configuration userConf = conf; + + if (null != userConf) { + log.info("Configuring {} provider {} with user-provided configuration", + KryoShimService.class.getSimpleName(), result); + result.applyConfiguration(userConf); + } + + return cachedShimService = result; } /** http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9321a3e1/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/SerializerShim.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/SerializerShim.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/SerializerShim.java index 191cdd8..e5f9005 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/SerializerShim.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/SerializerShim.java @@ -26,7 +26,7 @@ package org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim; */ public interface SerializerShim<T> { - <O extends OutputShim> void write(KryoShim<?, O> kryo, O output, T starGraph); + <O extends OutputShim> void write(KryoShim<?, O> kryo, O output, T object); <I extends InputShim> T read(KryoShim<I, ?> kryo, I input, Class<T> clazz); http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9321a3e1/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/HadoopCombine.java ---------------------------------------------------------------------- diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/HadoopCombine.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/HadoopCombine.java index de1e2f9..06778e6 100644 --- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/HadoopCombine.java +++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/HadoopCombine.java @@ -25,6 +25,7 @@ import org.apache.tinkerpop.gremlin.hadoop.structure.io.HadoopPools; import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable; import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil; import org.apache.tinkerpop.gremlin.process.computer.MapReduce; +import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimServiceLoader; import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,7 +48,7 @@ public final class HadoopCombine extends Reducer<ObjectWritable, ObjectWritable, @Override public void setup(final Reducer<ObjectWritable, ObjectWritable, ObjectWritable, ObjectWritable>.Context context) { final Configuration apacheConfiguration = ConfUtil.makeApacheConfiguration(context.getConfiguration()); - HadoopPools.initialize(apacheConfiguration); + KryoShimServiceLoader.applyConfiguration(apacheConfiguration); this.mapReduce = MapReduce.createMapReduce(HadoopGraph.open(apacheConfiguration), apacheConfiguration); this.mapReduce.workerStart(MapReduce.Stage.COMBINE); } http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9321a3e1/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/HadoopMap.java ---------------------------------------------------------------------- diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/HadoopMap.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/HadoopMap.java index 9e6fac3..5fc7026 100644 --- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/HadoopMap.java +++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/HadoopMap.java @@ -28,6 +28,7 @@ import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable; import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil; import org.apache.tinkerpop.gremlin.process.computer.MapReduce; import org.apache.tinkerpop.gremlin.process.computer.util.ComputerGraph; +import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimServiceLoader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,7 +51,7 @@ public final class HadoopMap extends Mapper<NullWritable, VertexWritable, Object @Override public void setup(final Mapper<NullWritable, VertexWritable, ObjectWritable, ObjectWritable>.Context context) { final Configuration apacheConfiguration = ConfUtil.makeApacheConfiguration(context.getConfiguration()); - HadoopPools.initialize(apacheConfiguration); + KryoShimServiceLoader.applyConfiguration(apacheConfiguration); this.mapReduce = MapReduce.createMapReduce(HadoopGraph.open(apacheConfiguration), apacheConfiguration); this.mapReduce.workerStart(MapReduce.Stage.MAP); } http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9321a3e1/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/HadoopReduce.java ---------------------------------------------------------------------- diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/HadoopReduce.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/HadoopReduce.java index 06dfba1..6ca7b8f 100644 --- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/HadoopReduce.java +++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/HadoopReduce.java @@ -25,6 +25,7 @@ import org.apache.tinkerpop.gremlin.hadoop.structure.io.HadoopPools; import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable; import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil; import org.apache.tinkerpop.gremlin.process.computer.MapReduce; +import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimServiceLoader; import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,7 +48,7 @@ public final class HadoopReduce extends Reducer<ObjectWritable, ObjectWritable, @Override public void setup(final Reducer<ObjectWritable, ObjectWritable, ObjectWritable, ObjectWritable>.Context context) { final Configuration apacheConfiguration = ConfUtil.makeApacheConfiguration(context.getConfiguration()); - HadoopPools.initialize(apacheConfiguration); + KryoShimServiceLoader.applyConfiguration(apacheConfiguration); this.mapReduce = MapReduce.createMapReduce(HadoopGraph.open(apacheConfiguration), apacheConfiguration); this.mapReduce.workerStart(MapReduce.Stage.REDUCE); } http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9321a3e1/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPoolShimService.java ---------------------------------------------------------------------- diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPoolShimService.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPoolShimService.java index c19b914..5753d90 100644 --- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPoolShimService.java +++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPoolShimService.java @@ -18,7 +18,9 @@ */ package org.apache.tinkerpop.gremlin.hadoop.structure.io; +import org.apache.commons.configuration.Configuration; import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimService; +import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimServiceLoader; import org.apache.tinkerpop.shaded.kryo.Kryo; import org.apache.tinkerpop.shaded.kryo.io.Input; import org.apache.tinkerpop.shaded.kryo.io.Output; @@ -66,4 +68,9 @@ public class HadoopPoolShimService implements KryoShimService { public int getPriority() { return 0; } + + @Override + public void applyConfiguration(Configuration conf) { + KryoShimServiceLoader.applyConfiguration(conf); + } } http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9321a3e1/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPoolsConfigurable.java ---------------------------------------------------------------------- diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPoolsConfigurable.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPoolsConfigurable.java index f3a1bac..0e5f135 100644 --- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPoolsConfigurable.java +++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPoolsConfigurable.java @@ -20,6 +20,8 @@ package org.apache.tinkerpop.gremlin.hadoop.structure.io; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; +import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil; +import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimServiceLoader; /** * @author Marko A. Rodriguez (http://markorodriguez.com) @@ -28,7 +30,7 @@ public interface HadoopPoolsConfigurable extends Configurable { @Override public default void setConf(final Configuration configuration) { - HadoopPools.initialize(configuration); + KryoShimServiceLoader.applyConfiguration(ConfUtil.makeApacheConfiguration(configuration)); } @Override http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9321a3e1/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoRecordReader.java ---------------------------------------------------------------------- diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoRecordReader.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoRecordReader.java index d7ed46b..a1daddf 100644 --- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoRecordReader.java +++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoRecordReader.java @@ -37,6 +37,7 @@ import org.apache.tinkerpop.gremlin.structure.Vertex; import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoMapper; import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoReader; import org.apache.tinkerpop.gremlin.structure.io.gryo.VertexTerminator; +import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimServiceLoader; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -71,7 +72,7 @@ public final class GryoRecordReader extends RecordReader<NullWritable, VertexWri final Configuration configuration = context.getConfiguration(); if (configuration.get(Constants.GREMLIN_HADOOP_GRAPH_FILTER, null) != null) this.graphFilter = VertexProgramHelper.deserialize(ConfUtil.makeApacheConfiguration(configuration), Constants.GREMLIN_HADOOP_GRAPH_FILTER); - HadoopPools.initialize(configuration); + KryoShimServiceLoader.applyConfiguration(ConfUtil.makeApacheConfiguration(configuration)); this.gryoReader = HadoopPools.getGryoPool().takeReader(); long start = split.getStart(); final Path file = split.getPath(); http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9321a3e1/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoRecordWriter.java ---------------------------------------------------------------------- diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoRecordWriter.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoRecordWriter.java index 67a8339..2ea3394 100644 --- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoRecordWriter.java +++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/gryo/GryoRecordWriter.java @@ -25,8 +25,10 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.tinkerpop.gremlin.hadoop.Constants; import org.apache.tinkerpop.gremlin.hadoop.structure.io.HadoopPools; import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable; +import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil; import org.apache.tinkerpop.gremlin.structure.Direction; import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoWriter; +import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimServiceLoader; import java.io.DataOutputStream; import java.io.IOException; @@ -43,7 +45,7 @@ public final class GryoRecordWriter extends RecordWriter<NullWritable, VertexWri public GryoRecordWriter(final DataOutputStream outputStream, final Configuration configuration) { this.outputStream = outputStream; this.hasEdges = configuration.getBoolean(Constants.GREMLIN_HADOOP_GRAPH_WRITER_HAS_EDGES, true); - HadoopPools.initialize(configuration); + KryoShimServiceLoader.applyConfiguration(ConfUtil.makeApacheConfiguration(configuration)); this.gryoWriter = HadoopPools.getGryoPool().takeWriter(); } http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9321a3e1/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java index c2b85dd..9e5ac53 100644 --- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java +++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java @@ -38,6 +38,7 @@ import org.apache.tinkerpop.gremlin.spark.process.computer.payload.Payload; import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewIncomingPayload; import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewOutgoingPayload; import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewPayload; +import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimServiceLoader; import org.apache.tinkerpop.gremlin.structure.util.Attachable; import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedFactory; import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertexProperty; @@ -89,7 +90,7 @@ public final class SparkExecutor { graphRDD.leftOuterJoin(viewIncomingRDD)) // every other iteration may have views and messages // for each partition of vertices emit a view and their outgoing messages .mapPartitionsToPair(partitionIterator -> { - HadoopPools.initialize(apacheConfiguration); + KryoShimServiceLoader.applyConfiguration(apacheConfiguration); final VertexProgram<M> workerVertexProgram = VertexProgram.<VertexProgram<M>>createVertexProgram(HadoopGraph.open(apacheConfiguration), apacheConfiguration); // each partition(Spark)/worker(TP3) has a local copy of the vertex program (a worker's task) final String[] vertexComputeKeysArray = VertexProgramHelper.vertexComputeKeysAsArray(workerVertexProgram.getVertexComputeKeys()); // the compute keys as an array final SparkMessenger<M> messenger = new SparkMessenger<>(); http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9321a3e1/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/TinkerPopKryoRegistrator.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/TinkerPopKryoRegistrator.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/TinkerPopKryoRegistrator.java deleted file mode 100644 index 4c99e70..0000000 --- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/TinkerPopKryoRegistrator.java +++ /dev/null @@ -1,121 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.tinkerpop.gremlin.spark.structure.io; - -import com.esotericsoftware.kryo.Kryo; -import com.esotericsoftware.kryo.Serializer; -import com.esotericsoftware.kryo.serializers.JavaSerializer; -import com.google.common.base.Preconditions; -import org.apache.spark.serializer.KryoRegistrator; -import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable; -import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable; -import org.apache.tinkerpop.gremlin.process.traversal.step.map.GroupStep; -import org.apache.tinkerpop.gremlin.process.traversal.step.map.OrderGlobalStep; -import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalExplanation; -import org.apache.tinkerpop.gremlin.spark.process.computer.payload.MessagePayload; -import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewIncomingPayload; -import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewOutgoingPayload; -import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewPayload; -import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.ObjectWritableSerializer; -import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.VertexWritableSerializer; -import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.kryoshim.unshaded.UnshadedSerializerAdapter; -import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoMapper; -import org.apache.tinkerpop.gremlin.structure.io.gryo.TypeRegistration; -import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.SerializerShim; -import org.apache.tinkerpop.gremlin.structure.util.star.StarGraph; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; - -/** - * A spark.kryo.registrator implementation that installs TinkerPop types. - * This is intended for use with spark.serializer=KryoSerializer, not GryoSerializer. - */ -public class TinkerPopKryoRegistrator implements KryoRegistrator { - - private static final Logger log = LoggerFactory.getLogger(TinkerPopKryoRegistrator.class); - - @Override - public void registerClasses(Kryo kryo) { - // TinkerPop type registrations copied from GyroSerializer's constructor - kryo.register(MessagePayload.class); - kryo.register(ViewIncomingPayload.class); - kryo.register(ViewOutgoingPayload.class); - kryo.register(ViewPayload.class); - kryo.register(VertexWritable.class, new UnshadedSerializerAdapter<>(new VertexWritableSerializer())); - kryo.register(ObjectWritable.class, new UnshadedSerializerAdapter<>(new ObjectWritableSerializer<>())); - - Set<Class<?>> shimmedClasses = new HashSet<>(); - - Set<Class<?>> javaSerializationClasses = new HashSet<>(); - - // Copy GryoMapper's default registrations - for (TypeRegistration<?> tr : GryoMapper.build().create().getTypeRegistrations()) { - // Special case for JavaSerializer, which is generally implemented in terms of TinkerPop's - // problematic static GryoMapper/GryoSerializer pool (these are handled below the loop) - org.apache.tinkerpop.shaded.kryo.Serializer<?> shadedSerializer = tr.getShadedSerializer(); - SerializerShim<?> serializerShim = tr.getSerializerShim(); - if (null != shadedSerializer && - shadedSerializer.getClass().equals(org.apache.tinkerpop.shaded.kryo.serializers.JavaSerializer.class)) { - javaSerializationClasses.add(tr.getTargetClass()); - } else if (null != serializerShim) { - log.debug("Registering class {} to serializer shim {} (serializer shim class {})", - tr.getTargetClass(), serializerShim, serializerShim.getClass()); - kryo.register(tr.getTargetClass(), new UnshadedSerializerAdapter<>(serializerShim)); - shimmedClasses.add(tr.getTargetClass()); - } else { - // Register with the default behavior (FieldSerializer) - log.debug("Registering class {} with default serializer", tr.getTargetClass()); - kryo.register(tr.getTargetClass()); - } - } - - Map<Class<?>, Serializer<?>> javaSerializerReplacements = new HashMap<>(); - javaSerializerReplacements.put(GroupStep.GroupBiOperator.class, new JavaSerializer()); - javaSerializerReplacements.put(OrderGlobalStep.OrderBiOperator.class, null); - javaSerializerReplacements.put(TraversalExplanation.class, null); - - for (Map.Entry<Class<?>, Serializer<?>> e : javaSerializerReplacements.entrySet()) { - Class<?> c = e.getKey(); - Serializer<?> s = e.getValue(); - - if (javaSerializationClasses.remove(c)) { - if (null != s) { - log.debug("Registering class {} with serializer {}", c, s); - kryo.register(c, s); - } else { - log.debug("Registering class {} with default serializer", c); - kryo.register(c); - } - } else { - log.debug("Registering class {} with JavaSerializer", c); - kryo.register(c, new JavaSerializer()); - } - } - - // We really care about StarGraph's shim serializer, so make sure we registered it - if (!shimmedClasses.contains(StarGraph.class)) { - log.warn("No SerializerShim found for StarGraph"); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9321a3e1/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java index 2c1dfa2..28a4d55 100644 --- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java +++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java @@ -78,7 +78,7 @@ public final class GryoSerializer extends Serializer { } } this.gryoPool = GryoPool.build(). - poolSize(sparkConfiguration.getInt(GryoPool.CONFIG_IO_GRYO_POOL_SIZE, 256)). + poolSize(sparkConfiguration.getInt(GryoPool.CONFIG_IO_GRYO_POOL_SIZE, GryoPool.CONFIG_IO_GRYO_POOL_SIZE_DEFAULT)). ioRegistries(makeApacheConfiguration(sparkConfiguration).getList(GryoPool.CONFIG_IO_REGISTRY, Collections.emptyList())). initializeMapper(builder -> { try { http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9321a3e1/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/IoRegistryAwareKryoSerializer.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/IoRegistryAwareKryoSerializer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/IoRegistryAwareKryoSerializer.java new file mode 100644 index 0000000..8b21e21 --- /dev/null +++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/IoRegistryAwareKryoSerializer.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * Copyright DataStax, Inc. + * <p> + * Please see the included license file for details. + */ +package org.apache.tinkerpop.gremlin.spark.structure.io.gryo; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.Serializer; +import org.apache.spark.SparkConf; +import org.apache.spark.serializer.KryoSerializer; +import org.apache.tinkerpop.gremlin.structure.io.IoRegistry; +import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoIo; +import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoPool; +import org.javatuples.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +/** + * A {@link KryoSerializer} that attempts to honor {@link GryoPool#CONFIG_IO_REGISTRY}. + */ +public class IoRegistryAwareKryoSerializer extends KryoSerializer { + + private final SparkConf conf; + + private static final Logger log = LoggerFactory.getLogger(IoRegistryAwareKryoSerializer.class); + + public IoRegistryAwareKryoSerializer(SparkConf conf) { + super(conf); + // store conf so that we can access its registry (if one is present) in newKryo() + this.conf = conf; + } + + @Override + public Kryo newKryo() { + Kryo kryo = super.newKryo(); + + return applyIoRegistryIfPresent(kryo); + } + + private Kryo applyIoRegistryIfPresent(Kryo kryo) { + if (!conf.contains(GryoPool.CONFIG_IO_REGISTRY)) { + log.info("SparkConf {} does not contain setting {}, skipping {} handling", + GryoPool.CONFIG_IO_REGISTRY, conf, IoRegistry.class.getCanonicalName()); + return kryo; + } + + String registryClassnames = conf.get(GryoPool.CONFIG_IO_REGISTRY); + + for (String registryClassname : registryClassnames.split(",")) { + final IoRegistry registry; + + try { + registry = (IoRegistry) Class.forName(registryClassname).newInstance(); + log.info("Instantiated {}", registryClassname); + } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) { + log.error("Unable to reflectively instantiate the {} implementation named {}", + IoRegistry.class.getCanonicalName(), registryClassname, e); + return kryo; + } + + // Left is the class targeted for serialization, right is a mess of potential types, including + // a shaded Serializer impl, unshaded Serializer impl, or Function<shaded.Kryo,shaded.Serializer> + final List<Pair<Class, Object>> serializers = registry.find(GryoIo.class); + + if (null == serializers) { + log.info("Invoking find({}.class) returned null on registry {}; ignoring this registry", + GryoIo.class.getCanonicalName(), registry); + return kryo; + } + + for (Pair<Class, Object> p : serializers) { + if (null == p.getValue1()) { + // null on the right is fine + log.info("Registering {} with default serializer", p.getValue0()); + kryo.register(p.getValue0()); + } else if (p.getValue1() instanceof Serializer) { + // unshaded serializer on the right is fine + log.info("Registering {} with serializer {}", p.getValue0(), p.getValue1()); + kryo.register(p.getValue0(), (Serializer) p.getValue1()); + } else { + // anything else on the right is unsupported with Spark + log.error("Serializer {} found in {} must implement {} " + + "(the shaded interface {} is not supported on Spark). This class will be registered with " + + "the default behavior of Spark's KryoSerializer.", + p.getValue1(), registryClassname, Serializer.class.getCanonicalName(), + org.apache.tinkerpop.shaded.kryo.Serializer.class.getCanonicalName()); + kryo.register(p.getValue0()); + } + } + } + + return kryo; + } +} http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9321a3e1/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/TinkerPopKryoRegistrator.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/TinkerPopKryoRegistrator.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/TinkerPopKryoRegistrator.java new file mode 100644 index 0000000..bdb80fd --- /dev/null +++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/TinkerPopKryoRegistrator.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.tinkerpop.gremlin.spark.structure.io.gryo; + +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.Serializer; +import com.esotericsoftware.kryo.serializers.JavaSerializer; +import org.apache.spark.serializer.KryoRegistrator; +import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable; +import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable; +import org.apache.tinkerpop.gremlin.spark.process.computer.payload.MessagePayload; +import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewIncomingPayload; +import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewOutgoingPayload; +import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewPayload; +import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.kryoshim.unshaded.UnshadedSerializerAdapter; +import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoMapper; +import org.apache.tinkerpop.gremlin.structure.io.gryo.TypeRegistration; +import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.SerializerShim; +import org.apache.tinkerpop.gremlin.structure.util.star.StarGraph; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; + +/** + * A spark.kryo.registrator implementation that installs TinkerPop types. + * This is intended for use with spark.serializer=KryoSerializer, not GryoSerializer. + */ +public class TinkerPopKryoRegistrator implements KryoRegistrator { + + private static final Logger log = LoggerFactory.getLogger(TinkerPopKryoRegistrator.class); + + @Override + public void registerClasses(Kryo kryo) { + registerClasses(kryo, Collections.emptyMap(), Collections.emptySet()); + } + + /** + * Register TinkerPop's classes with the supplied {@link Kryo} instance + * while honoring optional overrides and optional class blacklist ("blackset"?). + * + * @param kryo the Kryo serializer instance with which to register types + * @param serializerOverrides serializer mappings that override this class's defaults + * @param blacklist classes which should not be registered at all, even if there is an override entry + * or if they would be registered by this class by default (does not affect Kryo's + * built-in registrations, e.g. String.class). + */ + public void registerClasses(Kryo kryo, Map<Class<?>, Serializer<?>> serializerOverrides, Set<Class<?>> blacklist) { + // Apply TinkerPop type registrations copied from GyroSerializer's constructor + for (Map.Entry<Class<?>, Serializer<?>> ent : getExtraRegistrations().entrySet()) { + Class<?> targetClass = ent.getKey(); + Serializer<?> ser = ent.getValue(); + + // Is this class blacklisted? Skip it. (takes precedence over serializerOverrides) + if (blacklist.contains(targetClass)) { + log.debug("Not registering serializer for {} (blacklisted)", targetClass); + continue; + } + + if (checkForAndApplySerializerOverride(serializerOverrides, kryo, targetClass)) { + // do nothing but skip the remaining else(-if) clauses + } else if (null == ser) { + log.debug("Registering {} with default serializer", targetClass); + kryo.register(targetClass); + } else { + log.debug("Registering {} with serializer {}", targetClass, ser); + kryo.register(targetClass, ser); + } + } + + Set<Class<?>> shimmedClassesFromGryoMapper = new HashSet<>(); + + // Apply GryoMapper's default registrations + for (TypeRegistration<?> tr : GryoMapper.build().create().getTypeRegistrations()) { + // Is this class blacklisted? Skip it. (takes precedence over serializerOverrides) + if (blacklist.contains(tr.getTargetClass())) { + log.debug("Not registering serializer for {} (blacklisted)", tr.getTargetClass()); + continue; + } + + final org.apache.tinkerpop.shaded.kryo.Serializer<?> shadedSerializer = tr.getShadedSerializer(); + final SerializerShim<?> serializerShim = tr.getSerializerShim(); + final java.util.function.Function< + org.apache.tinkerpop.shaded.kryo.Kryo, + org.apache.tinkerpop.shaded.kryo.Serializer> functionOfShadedKryo = tr.getFunctionOfShadedKryo(); + + // Apply overrides with the highest case-precedence + if (checkForAndApplySerializerOverride(serializerOverrides, kryo, tr.getTargetClass())) { + // do nothing but skip the remaining else(-if) clauses + } else if (null != shadedSerializer) { + if (shadedSerializer.getClass().equals(org.apache.tinkerpop.shaded.kryo.serializers.JavaSerializer.class)) { + // Convert GryoMapper's shaded JavaSerializer mappings to their unshaded equivalents + log.debug("Registering {} with JavaSerializer", tr.getTargetClass()); + kryo.register(tr.getTargetClass(), new JavaSerializer()); + } else { + // There's supposed to be a check in GryoMapper that prevents this from happening + log.error("GryoMapper's default serialization registration for {} is a {}. " + + "This is probably a bug in TinkerPop (this is not a valid default registration). " + + "I am configuring Spark to use Kryo's default serializer for this class, " + + "but this may cause serialization failures at runtime.", + tr.getTargetClass(), + org.apache.tinkerpop.shaded.kryo.Serializer.class.getCanonicalName()); + kryo.register(tr.getTargetClass()); + } + } else if (null != serializerShim) { + // Wrap shim serializers in an adapter for Spark's unshaded Kryo + log.debug("Registering {} to serializer shim {} (serializer shim {})", + tr.getTargetClass(), serializerShim, serializerShim.getClass()); + kryo.register(tr.getTargetClass(), new UnshadedSerializerAdapter<>(serializerShim)); + shimmedClassesFromGryoMapper.add(tr.getTargetClass()); + } else if (null != functionOfShadedKryo) { + // As with shaded serializers, there's supposed to be a check in GryoMapper that prevents this from happening + log.error("GryoMapper's default serialization registration for {} is a Function<{},{}>. " + + "This is probably a bug in TinkerPop (this is not a valid default registration). " + + "I am configuring Spark to use Kryo's default serializer instead of this function, " + + "but this may cause serialization failures at runtime.", + tr.getTargetClass(), + org.apache.tinkerpop.shaded.kryo.Kryo.class.getCanonicalName(), + org.apache.tinkerpop.shaded.kryo.Serializer.class.getCanonicalName()); + kryo.register(tr.getTargetClass()); + } else { + // Register all other classes with the default behavior (FieldSerializer) + log.debug("Registering {} with default serializer", tr.getTargetClass()); + kryo.register(tr.getTargetClass()); + } + } + + // StarGraph's shim serializer is especially important on Spark for efficiency reasons, + // so log a warning if we failed to register it somehow + if (!shimmedClassesFromGryoMapper.contains(StarGraph.class)) { + log.warn("No SerializerShim found for StarGraph"); + } + } + + private LinkedHashMap<Class<?>, Serializer<?>> getExtraRegistrations() { + + /* The map returned by this method MUST have a fixed iteration order! + * + * The order itself is irrelevant, so long as it is completely stable at runtime. + * + * LinkedHashMap satisfies this requirement (its contract specifies + * iteration in key-insertion-order). + */ + + LinkedHashMap<Class<?>, Serializer<?>> m = new LinkedHashMap<>(); + // The following entries were copied from GryoSerializer's constructor + // This could be turned into a static collection on GryoSerializer to avoid + // duplication, but it would be a bit cumbersome to do so without disturbing + // the ordering of the existing entries in that constructor, since not all + // of the entries are for TinkerPop (and the ordering is significant). + m.put(MessagePayload.class, null); + m.put(ViewIncomingPayload.class, null); + m.put(ViewOutgoingPayload.class, null); + m.put(ViewPayload.class, null); + m.put(VertexWritable.class, new UnshadedSerializerAdapter<>(new VertexWritableSerializer())); + m.put(ObjectWritable.class, new UnshadedSerializerAdapter<>(new ObjectWritableSerializer<>())); + + return m; + } + + private boolean checkForAndApplySerializerOverride(Map<Class<?>, Serializer<?>> serializerOverrides, + Kryo kryo, Class<?> targetClass) { + if (serializerOverrides.containsKey(targetClass)) { + Serializer<?> ser = serializerOverrides.get(targetClass); + if (null == ser) { + // null means use Kryo's default serializer + log.debug("Registering {} with default serializer per overrides", targetClass); + kryo.register(targetClass); + } else { + // nonnull means use that serializer + log.debug("Registering {} with serializer {} per overrides", targetClass, ser); + kryo.register(targetClass, ser); + } + return true; + } + return false; + } +} http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9321a3e1/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedKryoShimService.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedKryoShimService.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedKryoShimService.java index d0411e8..a524a97 100644 --- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedKryoShimService.java +++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedKryoShimService.java @@ -24,92 +24,131 @@ */ package org.apache.tinkerpop.gremlin.spark.structure.io.gryo.kryoshim.unshaded; -import com.twitter.chill.KryoInstantiator; -import com.twitter.chill.KryoPool; -import com.twitter.chill.SerDeState; +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import org.apache.commons.configuration.BaseConfiguration; +import org.apache.commons.configuration.Configuration; import org.apache.spark.SparkConf; -import org.apache.spark.serializer.KryoSerializer; -import org.apache.tinkerpop.gremlin.spark.structure.io.TinkerPopKryoRegistrator; +import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.IoRegistryAwareKryoSerializer; +import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoPool; import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.util.concurrent.LinkedBlockingQueue; public class UnshadedKryoShimService implements KryoShimService { - public static final String SPARK_KRYO_POOL_SIZE_SYSTEM_PROPERTY = "tinkerpop.kryo.poolsize"; - private static final Logger log = LoggerFactory.getLogger(UnshadedKryoShimService.class); - private static final int SPARK_KRYO_POOL_SIZE_DEFAULT = 8; - private final KryoSerializer sparkKryoSerializer; - private final KryoPool kryoPool; + private static final LinkedBlockingQueue<Kryo> KRYOS = new LinkedBlockingQueue<>(); - public UnshadedKryoShimService() { - this(TinkerPopKryoRegistrator.class.getCanonicalName(), getDefaultKryoPoolSize()); - } + private static volatile boolean initialized; - public UnshadedKryoShimService(String sparkKryoRegistratorClassname, int kryoPoolSize) { - SparkConf sparkConf = new SparkConf(); - sparkConf.set("spark.serializer", KryoSerializer.class.getCanonicalName()); - sparkConf.set("spark.kryo.registrator", sparkKryoRegistratorClassname); - sparkKryoSerializer = new KryoSerializer(sparkConf); - kryoPool = KryoPool.withByteArrayOutputStream(kryoPoolSize, new KryoInstantiator()); - } + public UnshadedKryoShimService() { } @Override public Object readClassAndObject(InputStream source) { - SerDeState sds = null; - try { - sds = kryoPool.borrow(); - sds.setInput(source); + LinkedBlockingQueue<Kryo> kryos = initialize(); - return sds.readClassAndObject(); + Kryo k = null; + try { + k = kryos.take(); + + return k.readClassAndObject(new Input(source)); + } catch (InterruptedException e) { + throw new RuntimeException(e); } finally { - kryoPool.release(sds); + try { + kryos.put(k); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } } } @Override public void writeClassAndObject(Object o, OutputStream sink) { - SerDeState sds = null; + + LinkedBlockingQueue<Kryo> kryos = initialize(); + + Kryo k = null; try { - sds = kryoPool.borrow(); + k = kryos.take(); - sds.writeClassAndObject(o); // this writes to an internal buffer + Output kryoOutput = new Output(sink); - sds.writeOutputTo(sink); // this copies the internal buffer to sink + k.writeClassAndObject(kryoOutput, o); - sink.flush(); - } catch (IOException e) { + kryoOutput.flush(); + } catch (InterruptedException e) { throw new RuntimeException(e); } finally { - kryoPool.release(sds); + try { + kryos.put(k); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } } } @Override public int getPriority() { - return 1024; + return 50; } - private static int getDefaultKryoPoolSize() { - String raw = System.getProperty(SPARK_KRYO_POOL_SIZE_SYSTEM_PROPERTY); + @Override + public void applyConfiguration(Configuration conf) { + initialize(conf); + } - int size = SPARK_KRYO_POOL_SIZE_DEFAULT; - try { - size = Integer.valueOf(raw); - log.info("Setting kryo pool size to {} according to system property {}", size, - SPARK_KRYO_POOL_SIZE_SYSTEM_PROPERTY); - } catch (NumberFormatException e) { - log.error("System property {}={} could not be parsed as an integer, using default value {}", - SPARK_KRYO_POOL_SIZE_SYSTEM_PROPERTY, raw, size, e); + private LinkedBlockingQueue<Kryo> initialize() { + return initialize(new BaseConfiguration()); + } + + private LinkedBlockingQueue<Kryo> initialize(Configuration conf) { + // DCL is safe in this case due to volatility + if (!initialized) { + synchronized (UnshadedKryoShimService.class) { + if (!initialized) { + SparkConf sparkConf = new SparkConf(); + + // Copy the user's IoRegistry from the param conf to the SparkConf we just created + String regStr = conf.getString(GryoPool.CONFIG_IO_REGISTRY); + if (null != regStr) { // SparkConf rejects null values with NPE, so this has to be checked before set(...) + sparkConf.set(GryoPool.CONFIG_IO_REGISTRY, regStr); + } + // Setting spark.serializer here almost certainly isn't necessary, but it doesn't hurt + sparkConf.set("spark.serializer", IoRegistryAwareKryoSerializer.class.getCanonicalName()); + + String registrator = conf.getString("spark.kryo.registrator"); + if (null != registrator) { + sparkConf.set("spark.kryo.registrator", registrator); + log.info("Copied spark.kryo.registrator: {}", registrator); + } else { + log.info("Not copying spark.kryo.registrator"); + } + + // Reuse Gryo poolsize for Kryo poolsize (no need to copy this to SparkConf) + int poolSize = conf.getInt(GryoPool.CONFIG_IO_GRYO_POOL_SIZE, + GryoPool.CONFIG_IO_GRYO_POOL_SIZE_DEFAULT); + // Instantiate the spark.serializer + final IoRegistryAwareKryoSerializer ioReg = new IoRegistryAwareKryoSerializer(sparkConf); + // Setup a pool backed by our spark.serializer instance + + for (int i = 0; i < poolSize; i++) { + KRYOS.add(ioReg.newKryo()); + } + + initialized = true; + } + } } - return size; + return KRYOS; } } http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/9321a3e1/spark-gremlin/src/main/resources/META-INF/services/org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimService ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/main/resources/META-INF/services/org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimService b/spark-gremlin/src/main/resources/META-INF/services/org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimService new file mode 100644 index 0000000..68712a6 --- /dev/null +++ b/spark-gremlin/src/main/resources/META-INF/services/org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimService @@ -0,0 +1 @@ +org.apache.tinkerpop.gremlin.spark.structure.io.gryo.kryoshim.unshaded.UnshadedKryoShimService # Supports Spark