worked with @dalaro to fix a bug in HadoopPoolShimService. Reverted my last work on VertexProgramHelper.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/90e31599 Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/90e31599 Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/90e31599 Branch: refs/heads/tp31 Commit: 90e3159969363e6a3383ffc64d58c27f76384a55 Parents: 797364c Author: Marko A. Rodriguez <okramma...@gmail.com> Authored: Mon Jun 6 16:55:42 2016 -0600 Committer: Marko A. Rodriguez <okramma...@gmail.com> Committed: Mon Jun 6 16:55:42 2016 -0600 ---------------------------------------------------------------------- .../computer/util/VertexProgramHelper.java | 33 +++++--------------- .../io/gryo/kryoshim/KryoShimServiceLoader.java | 17 +++++----- .../structure/io/HadoopPoolShimService.java | 2 +- .../gremlin/hadoop/HadoopGraphProvider.java | 2 +- .../spark/process/computer/SparkExecutor.java | 11 +++---- 5 files changed, 23 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/90e31599/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/VertexProgramHelper.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/VertexProgramHelper.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/VertexProgramHelper.java index 2b3a0b2..bc67866 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/VertexProgramHelper.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/VertexProgramHelper.java @@ -25,13 +25,8 @@ import org.apache.tinkerpop.gremlin.process.traversal.Step; import org.apache.tinkerpop.gremlin.process.traversal.Traversal; import org.apache.tinkerpop.gremlin.process.traversal.step.map.EdgeVertexStep; import org.apache.tinkerpop.gremlin.process.traversal.step.map.VertexStep; -import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoPool; -import org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimServiceLoader; import org.apache.tinkerpop.gremlin.util.Serializer; -import org.apache.tinkerpop.shaded.kryo.io.Input; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.Arrays; import java.util.HashSet; @@ -42,8 +37,6 @@ import java.util.Set; */ public final class VertexProgramHelper { - private static final GryoPool GRYO_POOL = GryoPool.build().create(); - private VertexProgramHelper() { } @@ -74,33 +67,21 @@ public final class VertexProgramHelper { final String byteString = Arrays.toString(Serializer.serializeObject(object)); configuration.setProperty(key, byteString.substring(1, byteString.length() - 1)); } catch (final IOException e) { - try { - final ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); - GRYO_POOL.doWithWriter(kryo -> kryo.writeObject(outputStream, object)); - String byteString = Arrays.toString(outputStream.toByteArray()); - configuration.setProperty(key, byteString.substring(1, byteString.length() - 1)); - } catch (final Exception e1) { - throw new IllegalArgumentException(e1.getMessage(), e1); - } + throw new IllegalArgumentException(e.getMessage(), e); } } public static <T> T deserialize(final Configuration configuration, final String key) { - final String[] stringBytes = configuration.getString(key).split(","); - byte[] bytes = new byte[stringBytes.length]; - for (int i = 0; i < stringBytes.length; i++) { - bytes[i] = Byte.valueOf(stringBytes[i].trim()); - } try { + final String[] stringBytes = configuration.getString(key).split(","); + byte[] bytes = new byte[stringBytes.length]; + for (int i = 0; i < stringBytes.length; i++) { + bytes[i] = Byte.valueOf(stringBytes[i].trim()); + } return (T) Serializer.deserializeObject(bytes); } catch (final IOException | ClassNotFoundException e) { - try { - return (T) GRYO_POOL.readWithKryo(kryo -> kryo.readClassAndObject(new Input(new ByteArrayInputStream(bytes)))); - } catch (final Exception e1) { - throw new IllegalArgumentException(e1.getMessage(), e1); - } + throw new IllegalArgumentException(e.getMessage(), e); } - } public static <S, E> Traversal.Admin<S, E> reverse(final Traversal.Admin<S, E> traversal) { http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/90e31599/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java ---------------------------------------------------------------------- diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java index 9184dd0..fd57a3c 100644 --- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java +++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/gryo/kryoshim/KryoShimServiceLoader.java @@ -51,6 +51,7 @@ public class KryoShimServiceLoader { public static void applyConfiguration(Configuration conf) { KryoShimServiceLoader.conf = conf; + load(true); } /** @@ -195,20 +196,20 @@ public class KryoShimServiceLoader { if (0 == result) { log.warn("Found two {} implementations with the same canonical classname: {}. " + - "This may indicate a problem with the classpath/classloader such as " + - "duplicate or conflicting copies of the file " + - "META-INF/services/org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimService.", - a.getClass().getCanonicalName()); + "This may indicate a problem with the classpath/classloader such as " + + "duplicate or conflicting copies of the file " + + "META-INF/services/org.apache.tinkerpop.gremlin.structure.io.gryo.kryoshim.KryoShimService.", + a.getClass().getCanonicalName()); } else { String winner = 0 < result ? a.getClass().getCanonicalName() : b.getClass().getCanonicalName(); log.warn("{} implementations {} and {} are tied with priority value {}. " + - "Preferring {} to the other because it has a lexicographically greater classname. " + - "Consider setting the system property \"{}\" instead of relying on priority tie-breaking.", - KryoShimService.class.getSimpleName(), a, b, ap, winner, SHIM_CLASS_SYSTEM_PROPERTY); + "Preferring {} to the other because it has a lexicographically greater classname. " + + "Consider setting the system property \"{}\" instead of relying on priority tie-breaking.", + KryoShimService.class.getSimpleName(), a, b, ap, winner, SHIM_CLASS_SYSTEM_PROPERTY); } return result; } } } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/90e31599/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPoolShimService.java ---------------------------------------------------------------------- diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPoolShimService.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPoolShimService.java index 5753d90..df72b71 100644 --- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPoolShimService.java +++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopPoolShimService.java @@ -71,6 +71,6 @@ public class HadoopPoolShimService implements KryoShimService { @Override public void applyConfiguration(Configuration conf) { - KryoShimServiceLoader.applyConfiguration(conf); + HadoopPools.initialize(conf); } } http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/90e31599/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/HadoopGraphProvider.java ---------------------------------------------------------------------- diff --git a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/HadoopGraphProvider.java b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/HadoopGraphProvider.java index 57157db..e36c08d 100644 --- a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/HadoopGraphProvider.java +++ b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/HadoopGraphProvider.java @@ -112,7 +112,7 @@ public class HadoopGraphProvider extends AbstractGraphProvider { @Override public Map<String, Object> getBaseConfiguration(final String graphName, final Class<?> test, final String testMethodName, final LoadGraphWith.GraphData loadGraphWith) { - System.setProperty(SHIM_CLASS_SYSTEM_PROPERTY, HadoopPoolShimService.class.getCanonicalName()); + System.clearProperty(SHIM_CLASS_SYSTEM_PROPERTY); this.graphSONInput = RANDOM.nextBoolean(); return new HashMap<String, Object>() {{ put(Graph.GRAPH, HadoopGraph.class.getName()); http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/90e31599/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java index 9e5ac53..4db8086 100644 --- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java +++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java @@ -24,7 +24,6 @@ import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFlatMapFunction; import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph; -import org.apache.tinkerpop.gremlin.hadoop.structure.io.HadoopPools; import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable; import org.apache.tinkerpop.gremlin.process.computer.GraphFilter; import org.apache.tinkerpop.gremlin.process.computer.MapReduce; @@ -171,7 +170,7 @@ public final class SparkExecutor { assert graphRDD.partitioner().get().equals(newViewIncomingRDD.partitioner().get()); newViewIncomingRDD .foreachPartition(partitionIterator -> { - HadoopPools.initialize(apacheConfiguration); + KryoShimServiceLoader.applyConfiguration(apacheConfiguration); }); // need to complete a task so its BSP and the memory for this iteration is updated return newViewIncomingRDD; } @@ -206,7 +205,7 @@ public final class SparkExecutor { final JavaPairRDD<Object, VertexWritable> graphRDD, final MapReduce<K, V, ?, ?, ?> mapReduce, final Configuration apacheConfiguration) { JavaPairRDD<K, V> mapRDD = graphRDD.mapPartitionsToPair(partitionIterator -> { - HadoopPools.initialize(apacheConfiguration); + KryoShimServiceLoader.applyConfiguration(apacheConfiguration); return () -> new MapIterator<>(MapReduce.<MapReduce<K, V, ?, ?, ?>>createMapReduce(HadoopGraph.open(apacheConfiguration), apacheConfiguration), partitionIterator); }); if (mapReduce.getMapKeySort().isPresent()) @@ -217,7 +216,7 @@ public final class SparkExecutor { public static <K, V, OK, OV> JavaPairRDD<OK, OV> executeCombine(final JavaPairRDD<K, V> mapRDD, final Configuration apacheConfiguration) { return mapRDD.mapPartitionsToPair(partitionIterator -> { - HadoopPools.initialize(apacheConfiguration); + KryoShimServiceLoader.applyConfiguration(apacheConfiguration); return () -> new CombineIterator<>(MapReduce.<MapReduce<K, V, OK, OV, ?>>createMapReduce(HadoopGraph.open(apacheConfiguration), apacheConfiguration), partitionIterator); }); } @@ -226,11 +225,11 @@ public final class SparkExecutor { final JavaPairRDD<K, V> mapOrCombineRDD, final MapReduce<K, V, OK, OV, ?> mapReduce, final Configuration apacheConfiguration) { JavaPairRDD<OK, OV> reduceRDD = mapOrCombineRDD.groupByKey().mapPartitionsToPair(partitionIterator -> { - HadoopPools.initialize(apacheConfiguration); + KryoShimServiceLoader.applyConfiguration(apacheConfiguration); return () -> new ReduceIterator<>(MapReduce.<MapReduce<K, V, OK, OV, ?>>createMapReduce(HadoopGraph.open(apacheConfiguration), apacheConfiguration), partitionIterator); }); if (mapReduce.getReduceKeySort().isPresent()) reduceRDD = reduceRDD.sortByKey(mapReduce.getReduceKeySort().get(), true, 1); return reduceRDD; } -} +} \ No newline at end of file