Really simplified UnshadedKryoShimService and IoRegistryAwareKryoSerializer. Also, introduced a synchronization point in KryoServiceLoader.applyConfiguration() as I believe that multiple threads are creating a service over and over again. Hopefully this doesn't create a bottle neck. Going to test on the cluster.
Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/a0fa7c6e Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/a0fa7c6e Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/a0fa7c6e Branch: refs/heads/master Commit: a0fa7c6e5e6c9ee41aedf7e311e9f3f75a6dd8c5 Parents: 68ccbb2 Author: Marko A. Rodriguez <[email protected]> Authored: Wed Oct 26 10:13:40 2016 -0600 Committer: Marko A. Rodriguez <[email protected]> Committed: Tue Nov 29 04:57:14 2016 -0700 ---------------------------------------------------------------------- .../io/gryo/kryoshim/KryoShimServiceLoader.java | 8 +-- .../io/gryo/IoRegistryAwareKryoSerializer.java | 70 ++++---------------- .../unshaded/UnshadedKryoShimService.java | 54 +++++---------- .../SparkHadoopGraphGryoSerializerProvider.java | 7 +- .../computer/SparkHadoopGraphProvider.java | 7 +- .../structure/io/SparkContextStorageCheck.java | 11 +-- 6 files changed, 48 insertions(+), 109 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a0fa7c6e/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java index 0051204..5f50f9e 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java @@ -48,13 +48,13 @@ public class KryoShimServiceLoader { */ public static final String KRYO_SHIM_SERVICE = "gremlin.io.kryoShimService"; - public static void applyConfiguration(final Configuration configuration) { + public synchronized static void applyConfiguration(final Configuration configuration) { if (null == KryoShimServiceLoader.configuration || + null == KryoShimServiceLoader.cachedShimService || !ConfigurationUtils.toString(KryoShimServiceLoader.configuration).equals(ConfigurationUtils.toString(configuration))) { KryoShimServiceLoader.configuration = configuration; load(true); - } else - load(false); + } } /** @@ -114,7 +114,7 @@ public class KryoShimServiceLoader { throw new IllegalStateException("Unable to load KryoShimService"); // once the shim service is defined, configure it - log.info("Configuring KryoShimService {} with following configuration: {}", + log.info("Configuring KryoShimService {} with following configuration:\n####################\n{}\n####################", cachedShimService.getClass().getCanonicalName(), ConfigurationUtils.toString(configuration)); cachedShimService.applyConfiguration(configuration); http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a0fa7c6e/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/IoRegistryAwareKryoSerializer.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/IoRegistryAwareKryoSerializer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/IoRegistryAwareKryoSerializer.java index bf71fae..6d9b536 100644 --- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/IoRegistryAwareKryoSerializer.java +++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/IoRegistryAwareKryoSerializer.java @@ -25,92 +25,48 @@ package org.apache.tinkerpop.gremlin.spark.structure.io.gryo; import com.esotericsoftware.kryo.Kryo; -import com.esotericsoftware.kryo.Serializer; import org.apache.spark.SparkConf; import org.apache.spark.serializer.KryoSerializer; +import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.kryoshim.unshaded.UnshadedSerializerAdapter; import org.apache.tinkerpop.gremlin.structure.io.IoRegistry; -import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoIo; import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoPool; -import org.javatuples.Pair; +import org.apache.tinkerpop.gremlin.structure.io.gryo.TypeRegistration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.List; +import java.util.Arrays; /** * A {@link KryoSerializer} that attempts to honor {@link GryoPool#CONFIG_IO_REGISTRY}. */ public class IoRegistryAwareKryoSerializer extends KryoSerializer { - private final SparkConf conf; + private final SparkConf configuration; private static final Logger log = LoggerFactory.getLogger(IoRegistryAwareKryoSerializer.class); - public IoRegistryAwareKryoSerializer(final SparkConf conf) { - super(conf); + public IoRegistryAwareKryoSerializer(final SparkConf configuration) { + super(configuration); // store conf so that we can access its registry (if one is present) in newKryo() - this.conf = conf; + this.configuration = configuration; } @Override public Kryo newKryo() { final Kryo kryo = super.newKryo(); - return applyIoRegistryIfPresent(kryo); } private Kryo applyIoRegistryIfPresent(final Kryo kryo) { - if (!conf.contains(GryoPool.CONFIG_IO_REGISTRY)) { - log.info("SparkConf {} does not contain setting {}, skipping {} handling", - GryoPool.CONFIG_IO_REGISTRY, conf, IoRegistry.class.getCanonicalName()); + if (!this.configuration.contains(GryoPool.CONFIG_IO_REGISTRY)) { + log.info("SparkConf does not contain setting {}, skipping {} handling", GryoPool.CONFIG_IO_REGISTRY, IoRegistry.class.getCanonicalName()); return kryo; } - - final String registryClassnames = conf.get(GryoPool.CONFIG_IO_REGISTRY); - - for (String registryClassname : registryClassnames.split(",")) { - final IoRegistry registry; - - try { - registry = (IoRegistry) Class.forName(registryClassname).newInstance(); - log.info("Instantiated {}", registryClassname); - } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) { - log.error("Unable to reflectively instantiate the {} implementation named {}", - IoRegistry.class.getCanonicalName(), registryClassname, e); - return kryo; - } - - // Left is the class targeted for serialization, right is a mess of potential types, including - // a shaded Serializer impl, unshaded Serializer impl, or Function<shaded.Kryo,shaded.Serializer> - final List<Pair<Class, Object>> serializers = registry.find(GryoIo.class); - - if (null == serializers) { - log.info("Invoking find({}.class) returned null on registry {}; ignoring this registry", - GryoIo.class.getCanonicalName(), registry); - return kryo; - } - - for (Pair<Class, Object> p : serializers) { - if (null == p.getValue1()) { - // null on the right is fine - log.info("Registering {} with default serializer", p.getValue0()); - kryo.register(p.getValue0()); - } else if (p.getValue1() instanceof Serializer) { - // unshaded serializer on the right is fine - log.info("Registering {} with serializer {}", p.getValue0(), p.getValue1()); - kryo.register(p.getValue0(), (Serializer) p.getValue1()); - } else { - // anything else on the right is unsupported with Spark - log.error("Serializer {} found in {} must implement {} " + - "(the shaded interface {} is not supported on Spark). This class will be registered with " + - "the default behavior of Spark's KryoSerializer.", - p.getValue1(), registryClassname, Serializer.class.getCanonicalName(), - org.apache.tinkerpop.shaded.kryo.Serializer.class.getCanonicalName()); - kryo.register(p.getValue0()); - } - } + final GryoPool pool = GryoPool.build().poolSize(1).ioRegistries(Arrays.asList(this.configuration.get(GryoPool.CONFIG_IO_REGISTRY).split(","))).create(); + for (final TypeRegistration<?> type : pool.getMapper().getTypeRegistrations()) { + log.info("Registering {} with serializer {} and id {}", type.getTargetClass().getCanonicalName(), type.getSerializerShim(), type.getId()); + kryo.register(type.getTargetClass(), new UnshadedSerializerAdapter<>(type.getSerializerShim()), type.getId()); } - return kryo; } } http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a0fa7c6e/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedKryoShimService.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedKryoShimService.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedKryoShimService.java index 2b0efda..4932acb 100644 --- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedKryoShimService.java +++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/kryoshim/unshaded/UnshadedKryoShimService.java @@ -27,15 +27,13 @@ package org.apache.tinkerpop.gremlin.spark.structure.io.gryo.kryoshim.unshaded; import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; -import org.apache.commons.configuration.BaseConfiguration; import org.apache.commons.configuration.Configuration; import org.apache.spark.SparkConf; import org.apache.tinkerpop.gremlin.hadoop.Constants; +import org.apache.tinkerpop.gremlin.spark.structure.Spark; import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.IoRegistryAwareKryoSerializer; import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoPool; import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimService; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.InputStream; import java.io.OutputStream; @@ -43,22 +41,20 @@ import java.util.concurrent.LinkedBlockingQueue; public class UnshadedKryoShimService implements KryoShimService { - private static final Logger log = LoggerFactory.getLogger(UnshadedKryoShimService.class); private static final LinkedBlockingQueue<Kryo> KRYOS = new LinkedBlockingQueue<>(); - private static volatile boolean initialized; + private static volatile boolean INITIALIZED; @Override public Object readClassAndObject(final InputStream inputStream) { - final LinkedBlockingQueue<Kryo> kryos = initialize(); Kryo k = null; try { - k = kryos.take(); + k = KRYOS.take(); return k.readClassAndObject(new Input(inputStream)); } catch (final InterruptedException e) { throw new IllegalStateException(e); } finally { try { - kryos.put(k); + KRYOS.put(k); } catch (final InterruptedException e) { throw new IllegalStateException(e); } @@ -67,10 +63,9 @@ public class UnshadedKryoShimService implements KryoShimService { @Override public void writeClassAndObject(final Object object, OutputStream outputStream) { - final LinkedBlockingQueue<Kryo> kryos = initialize(); Kryo k = null; try { - k = kryos.take(); + k = KRYOS.take(); final Output kryoOutput = new Output(outputStream); k.writeClassAndObject(kryoOutput, object); kryoOutput.flush(); @@ -78,7 +73,7 @@ public class UnshadedKryoShimService implements KryoShimService { throw new IllegalStateException(e); } finally { try { - kryos.put(k); + KRYOS.put(k); } catch (final InterruptedException e) { throw new IllegalStateException(e); } @@ -95,44 +90,25 @@ public class UnshadedKryoShimService implements KryoShimService { initialize(configuration); } - private LinkedBlockingQueue<Kryo> initialize() { - return initialize(new BaseConfiguration()); - } - private LinkedBlockingQueue<Kryo> initialize(final Configuration configuration) { // DCL is safe in this case due to volatility - if (!initialized) { + if (!INITIALIZED) { synchronized (UnshadedKryoShimService.class) { - if (!initialized) { - final SparkConf sparkConf = new SparkConf(); - - // Copy the user's IoRegistry from the param conf to the SparkConf we just created - final String regStr = configuration.getString(GryoPool.CONFIG_IO_REGISTRY, null); - if (null != regStr) // SparkConf rejects null values with NPE, so this has to be checked before set(...) - sparkConf.set(GryoPool.CONFIG_IO_REGISTRY, regStr); - + if (!INITIALIZED) { + // so we don't get a WARN that a new configuration is being created within an active context + final SparkConf sparkConf = null == Spark.getContext() ? new SparkConf() : Spark.getContext().getConf().clone(); + configuration.getKeys().forEachRemaining(key -> sparkConf.set(key, configuration.getProperty(key).toString())); // Setting spark.serializer here almost certainly isn't necessary, but it doesn't hurt sparkConf.set(Constants.SPARK_SERIALIZER, IoRegistryAwareKryoSerializer.class.getCanonicalName()); - final String registrator = configuration.getString(Constants.SPARK_KRYO_REGISTRATOR); - if (null != registrator) { - sparkConf.set(Constants.SPARK_KRYO_REGISTRATOR, registrator); - log.info("Copied " + Constants.SPARK_KRYO_REGISTRATOR + ": {}", registrator); - } else { - log.info("Not copying " + Constants.SPARK_KRYO_REGISTRATOR); - } - // Instantiate the spark.serializer - final IoRegistryAwareKryoSerializer ioReg = new IoRegistryAwareKryoSerializer(sparkConf); - + final IoRegistryAwareKryoSerializer ioRegistrySerializer = new IoRegistryAwareKryoSerializer(sparkConf); // Setup a pool backed by our spark.serializer instance // Reuse Gryo poolsize for Kryo poolsize (no need to copy this to SparkConf) - final int poolSize = configuration.getInt(GryoPool.CONFIG_IO_GRYO_POOL_SIZE, - GryoPool.CONFIG_IO_GRYO_POOL_SIZE_DEFAULT); + final int poolSize = configuration.getInt(GryoPool.CONFIG_IO_GRYO_POOL_SIZE, GryoPool.CONFIG_IO_GRYO_POOL_SIZE_DEFAULT); for (int i = 0; i < poolSize; i++) { - KRYOS.add(ioReg.newKryo()); + KRYOS.add(ioRegistrySerializer.newKryo()); } - - initialized = true; + INITIALIZED = true; } } } http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a0fa7c6e/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphGryoSerializerProvider.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphGryoSerializerProvider.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphGryoSerializerProvider.java index 19b9121..0e7fe0d 100644 --- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphGryoSerializerProvider.java +++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphGryoSerializerProvider.java @@ -21,10 +21,8 @@ package org.apache.tinkerpop.gremlin.spark.process.computer; import org.apache.tinkerpop.gremlin.LoadGraphWith; import org.apache.tinkerpop.gremlin.hadoop.Constants; -import org.apache.tinkerpop.gremlin.hadoop.structure.io.HadoopPoolShimService; import org.apache.tinkerpop.gremlin.spark.structure.Spark; import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer; -import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimServiceLoader; import java.util.Map; @@ -34,7 +32,10 @@ import java.util.Map; public final class SparkHadoopGraphGryoSerializerProvider extends SparkHadoopGraphProvider { public Map<String, Object> getBaseConfiguration(final String graphName, final Class<?> test, final String testMethodName, final LoadGraphWith.GraphData loadGraphWith) { - Spark.close(); + if (!SparkHadoopGraphGryoSerializerProvider.class.getCanonicalName().equals(System.getProperty(PREVIOUS_SPARK_PROVIDER, null))) { + Spark.close(); + System.setProperty(PREVIOUS_SPARK_PROVIDER, SparkHadoopGraphGryoSerializerProvider.class.getCanonicalName()); + } final Map<String, Object> config = super.getBaseConfiguration(graphName, test, testMethodName, loadGraphWith); config.put(Constants.SPARK_SERIALIZER, GryoSerializer.class.getCanonicalName()); config.remove(Constants.SPARK_KRYO_REGISTRATOR); http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a0fa7c6e/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java index 878fd1e..8385610 100644 --- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java +++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java @@ -52,9 +52,14 @@ import java.util.Map; @GraphProvider.Descriptor(computer = SparkGraphComputer.class) public class SparkHadoopGraphProvider extends HadoopGraphProvider { + protected static final String PREVIOUS_SPARK_PROVIDER = "previous.spark.provider"; + @Override public Map<String, Object> getBaseConfiguration(final String graphName, final Class<?> test, final String testMethodName, final LoadGraphWith.GraphData loadGraphWith) { - Spark.close(); + if (this.getClass().equals(SparkHadoopGraphProvider.class) && !SparkHadoopGraphProvider.class.getCanonicalName().equals(System.getProperty(PREVIOUS_SPARK_PROVIDER, null))) { + Spark.close(); + System.setProperty(PREVIOUS_SPARK_PROVIDER, SparkHadoopGraphProvider.class.getCanonicalName()); + } final Map<String, Object> config = super.getBaseConfiguration(graphName, test, testMethodName, loadGraphWith); config.put(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, true); // this makes the test suite go really fast http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a0fa7c6e/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkContextStorageCheck.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkContextStorageCheck.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkContextStorageCheck.java index f9e5172..614b7b9 100644 --- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkContextStorageCheck.java +++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkContextStorageCheck.java @@ -19,6 +19,7 @@ package org.apache.tinkerpop.gremlin.spark.structure.io; +import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.storage.StorageLevel; import org.apache.tinkerpop.gremlin.LoadGraphWith; @@ -52,7 +53,7 @@ public class SparkContextStorageCheck extends AbstractStorageCheck { @Test @LoadGraphWith(LoadGraphWith.GraphData.MODERN) public void shouldSupportHeadMethods() throws Exception { - final Storage storage = SparkContextStorage.open("local[4]"); + final Storage storage = SparkContextStorage.open(graph.configuration()); final String outputLocation = graph.configuration().getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION); super.checkHeadMethods(storage, graph.configuration().getString(Constants.GREMLIN_HADOOP_INPUT_LOCATION), outputLocation, PersistedInputRDD.class, PersistedInputRDD.class); } @@ -60,7 +61,7 @@ public class SparkContextStorageCheck extends AbstractStorageCheck { @Test @LoadGraphWith(LoadGraphWith.GraphData.MODERN) public void shouldSupportRemoveAndListMethods() throws Exception { - final Storage storage = SparkContextStorage.open("local[4]"); + final Storage storage = SparkContextStorage.open(graph.configuration()); final String outputLocation = graph.configuration().getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION); super.checkRemoveAndListMethods(storage, outputLocation); } @@ -68,7 +69,7 @@ public class SparkContextStorageCheck extends AbstractStorageCheck { @Test @LoadGraphWith(LoadGraphWith.GraphData.MODERN) public void shouldSupportCopyMethods() throws Exception { - final Storage storage = SparkContextStorage.open("local[4]"); + final Storage storage = SparkContextStorage.open(graph.configuration()); final String outputLocation = graph.configuration().getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION); final String newOutputLocation = "new-location-for-copy"; super.checkCopyMethods(storage, outputLocation, newOutputLocation, PersistedInputRDD.class, PersistedInputRDD.class); @@ -77,14 +78,14 @@ public class SparkContextStorageCheck extends AbstractStorageCheck { @Test @LoadGraphWith(LoadGraphWith.GraphData.MODERN) public void shouldNotHaveResidualDataInStorage() throws Exception { - final Storage storage = SparkContextStorage.open("local[4]"); + final Storage storage = SparkContextStorage.open(graph.configuration()); final String outputLocation = graph.configuration().getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION); super.checkResidualDataInStorage(storage, outputLocation); } @Test public void shouldSupportDirectoryFileDistinction() throws Exception { - final Storage storage = SparkContextStorage.open("local[4]"); + final Storage storage = SparkContextStorage.open(graph.configuration()); for (int i = 0; i < 10; i++) { JavaSparkContext.fromSparkContext(Spark.getContext()).emptyRDD().setName("directory1/file1-" + i + ".txt.bz").persist(StorageLevel.DISK_ONLY()); }
