TINKERPOP-1389 Support Spark 2.0
Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/f0c5a5f1 Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/f0c5a5f1 Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/f0c5a5f1 Branch: refs/heads/master Commit: f0c5a5f133635215b96ba3f2b7431706e23938b3 Parents: 4c6e43d Author: yucx <[email protected]> Authored: Thu Sep 1 23:11:58 2016 -0700 Committer: Marko A. Rodriguez <[email protected]> Committed: Tue Nov 29 04:54:21 2016 -0700 ---------------------------------------------------------------------- giraph-gremlin/pom.xml | 2 +- hadoop-gremlin/pom.xml | 2 +- spark-gremlin/pom.xml | 13 +++++- .../spark/process/computer/SparkExecutor.java | 16 ++++---- .../SparkStarBarrierInterceptor.java | 10 ++--- .../spark/structure/io/gryo/GryoSerializer.java | 43 ++++++++++++++++---- 6 files changed, 59 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/f0c5a5f1/giraph-gremlin/pom.xml ---------------------------------------------------------------------- diff --git a/giraph-gremlin/pom.xml b/giraph-gremlin/pom.xml index 102f3a9..25d96ec 100644 --- a/giraph-gremlin/pom.xml +++ b/giraph-gremlin/pom.xml @@ -127,7 +127,7 @@ limitations under the License. <dependency> <groupId>javax.servlet</groupId> <artifactId>javax.servlet-api</artifactId> - <version>3.0.1</version> + <version>3.1.0</version> </dependency> <!-- TEST --> <dependency> http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/f0c5a5f1/hadoop-gremlin/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-gremlin/pom.xml b/hadoop-gremlin/pom.xml index 5416f18..46d318b 100644 --- a/hadoop-gremlin/pom.xml +++ b/hadoop-gremlin/pom.xml @@ -128,7 +128,7 @@ limitations under the License. <dependency> <groupId>javax.servlet</groupId> <artifactId>javax.servlet-api</artifactId> - <version>3.0.1</version> + <version>3.1.0</version> </dependency> <!-- TEST --> <dependency> http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/f0c5a5f1/spark-gremlin/pom.xml ---------------------------------------------------------------------- diff --git a/spark-gremlin/pom.xml b/spark-gremlin/pom.xml index f68d76b..625579e 100644 --- a/spark-gremlin/pom.xml +++ b/spark-gremlin/pom.xml @@ -30,6 +30,11 @@ <name>Apache TinkerPop :: Spark Gremlin</name> <dependencies> <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + <version>14.0.1</version> + </dependency> + <dependency> <groupId>org.apache.tinkerpop</groupId> <artifactId>gremlin-core</artifactId> <version>${project.version}</version> @@ -55,6 +60,10 @@ <artifactId>servlet-api</artifactId> </exclusion> <exclusion> + <groupId>javax.servlet</groupId> + <artifactId>javax.servlet-api</artifactId> + </exclusion> + <exclusion> <groupId>com.sun.jersey</groupId> <artifactId>jersey-core</artifactId> </exclusion> @@ -104,7 +113,7 @@ <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> - <version>1.6.1</version> + <version>2.0.0</version> <exclusions> <!-- self conflicts --> <exclusion> @@ -210,7 +219,7 @@ <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> - <version>2.4.4</version> + <version>2.6.5</version> </dependency> <dependency> <groupId>commons-lang</groupId> http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/f0c5a5f1/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java index 6e65e26..b8dfdd3 100644 --- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java +++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java @@ -18,7 +18,7 @@ */ package org.apache.tinkerpop.gremlin.spark.process.computer; -import com.google.common.base.Optional; +import org.apache.spark.api.java.Optional; import org.apache.commons.configuration.Configuration; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.function.Function2; @@ -65,7 +65,7 @@ public final class SparkExecutor { public static JavaPairRDD<Object, VertexWritable> applyGraphFilter(final JavaPairRDD<Object, VertexWritable> graphRDD, final GraphFilter graphFilter) { return graphRDD.mapPartitionsToPair(partitionIterator -> { final GraphFilter gFilter = graphFilter.clone(); - return () -> IteratorUtils.filter(partitionIterator, tuple -> (tuple._2().get().applyGraphFilter(gFilter)).isPresent()); + return IteratorUtils.filter(partitionIterator, tuple -> (tuple._2().get().applyGraphFilter(gFilter)).isPresent()); }, true); } @@ -101,7 +101,7 @@ public final class SparkExecutor { final SparkMessenger<M> messenger = new SparkMessenger<>(); workerVertexProgram.workerIterationStart(memory.asImmutable()); // start the worker - return () -> IteratorUtils.map(partitionIterator, vertexViewIncoming -> { + return IteratorUtils.map(partitionIterator, vertexViewIncoming -> { final StarGraph.StarVertex vertex = vertexViewIncoming._2()._1().get(); // get the vertex from the vertex writable final boolean hasViewAndMessages = vertexViewIncoming._2()._2().isPresent(); // if this is the first iteration, then there are no views or messages final List<DetachedVertexProperty<Object>> previousView = hasViewAndMessages ? vertexViewIncoming._2()._2().get().getView() : memory.isInitialIteration() ? new ArrayList<>() : Collections.emptyList(); @@ -139,7 +139,7 @@ public final class SparkExecutor { ///////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////// final PairFlatMapFunction<Tuple2<Object, ViewOutgoingPayload<M>>, Object, Payload> messageFunction = - tuple -> () -> IteratorUtils.concat( + tuple -> IteratorUtils.concat( IteratorUtils.of(new Tuple2<>(tuple._1(), tuple._2().getView())), // emit the view payload IteratorUtils.map(tuple._2().getOutgoingMessages().iterator(), message -> new Tuple2<>(message._1(), new MessagePayload<>(message._2())))); final MessageCombiner<M> messageCombiner = VertexProgram.<VertexProgram<M>>createVertexProgram(HadoopGraph.open(vertexProgramConfiguration), vertexProgramConfiguration).getMessageCombiner().orElse(null); @@ -178,7 +178,7 @@ public final class SparkExecutor { newViewIncomingRDD .foreachPartition(partitionIterator -> { KryoShimServiceLoader.applyConfiguration(graphComputerConfiguration); - }); // need to complete a task so its BSP and the memory for this iteration is updated + }); // need to complete a task so its BSP and the memory for this iteration is updatedà return newViewIncomingRDD; } @@ -213,7 +213,7 @@ public final class SparkExecutor { final Configuration graphComputerConfiguration) { JavaPairRDD<K, V> mapRDD = graphRDD.mapPartitionsToPair(partitionIterator -> { KryoShimServiceLoader.applyConfiguration(graphComputerConfiguration); - return () -> new MapIterator<>(MapReduce.<MapReduce<K, V, ?, ?, ?>>createMapReduce(HadoopGraph.open(graphComputerConfiguration), graphComputerConfiguration), partitionIterator); + return new MapIterator<>(MapReduce.<MapReduce<K, V, ?, ?, ?>>createMapReduce(HadoopGraph.open(graphComputerConfiguration), graphComputerConfiguration), partitionIterator); }); if (mapReduce.getMapKeySort().isPresent()) mapRDD = mapRDD.sortByKey(mapReduce.getMapKeySort().get(), true, 1); @@ -224,7 +224,7 @@ public final class SparkExecutor { final Configuration graphComputerConfiguration) { return mapRDD.mapPartitionsToPair(partitionIterator -> { KryoShimServiceLoader.applyConfiguration(graphComputerConfiguration); - return () -> new CombineIterator<>(MapReduce.<MapReduce<K, V, OK, OV, ?>>createMapReduce(HadoopGraph.open(graphComputerConfiguration), graphComputerConfiguration), partitionIterator); + return new CombineIterator<>(MapReduce.<MapReduce<K, V, OK, OV, ?>>createMapReduce(HadoopGraph.open(graphComputerConfiguration), graphComputerConfiguration), partitionIterator); }); } @@ -233,7 +233,7 @@ public final class SparkExecutor { final Configuration graphComputerConfiguration) { JavaPairRDD<OK, OV> reduceRDD = mapOrCombineRDD.groupByKey().mapPartitionsToPair(partitionIterator -> { KryoShimServiceLoader.applyConfiguration(graphComputerConfiguration); - return () -> new ReduceIterator<>(MapReduce.<MapReduce<K, V, OK, OV, ?>>createMapReduce(HadoopGraph.open(graphComputerConfiguration), graphComputerConfiguration), partitionIterator); + return new ReduceIterator<>(MapReduce.<MapReduce<K, V, OK, OV, ?>>createMapReduce(HadoopGraph.open(graphComputerConfiguration), graphComputerConfiguration), partitionIterator); }); if (mapReduce.getReduceKeySort().isPresent()) reduceRDD = reduceRDD.sortByKey(mapReduce.getReduceKeySort().get(), true, 1); http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/f0c5a5f1/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/interceptor/SparkStarBarrierInterceptor.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/interceptor/SparkStarBarrierInterceptor.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/interceptor/SparkStarBarrierInterceptor.java index 18a9f1e..59950da 100644 --- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/interceptor/SparkStarBarrierInterceptor.java +++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/interceptor/SparkStarBarrierInterceptor.java @@ -85,13 +85,11 @@ public final class SparkStarBarrierInterceptor implements SparkVertexProgramInte .filter(vertexWritable -> ElementHelper.idExists(vertexWritable.get().id(), graphStepIds)) // ensure vertex ids are in V(x) .flatMap(vertexWritable -> { if (identityTraversal) // g.V.count()-style (identity) - return () -> IteratorUtils.of(traversal.getTraverserGenerator().generate(vertexWritable.get(), (Step) graphStep, 1l)); + return IteratorUtils.of(traversal.getTraverserGenerator().generate(vertexWritable.get(), (Step) graphStep, 1l)); else { // add the vertex to head of the traversal - return () -> { // and iterate it for its results - final Traversal.Admin<Vertex, ?> clone = traversal.clone(); // need a unique clone for each vertex to isolate the computation + final Traversal.Admin<Vertex, ?> clone = traversal.clone(); // need a unique clone for each vertex to isolate the computation clone.getStartStep().addStart(clone.getTraverserGenerator().generate(vertexWritable.get(), graphStep, 1l)); return (Step) clone.getEndStep(); - }; } }); // USE SPARK DSL FOR THE RESPECTIVE END REDUCING BARRIER STEP OF THE TRAVERSAL @@ -133,14 +131,14 @@ public final class SparkStarBarrierInterceptor implements SparkVertexProgramInte result = ((GroupStep) endStep).generateFinalResult(nextRDD. mapPartitions(partitions -> { final GroupStep<Object, Object, Object> clone = (GroupStep) endStep.clone(); - return () -> IteratorUtils.map(partitions, clone::projectTraverser); + return IteratorUtils.map(partitions, clone::projectTraverser); }).fold(((GroupStep<Object, Object, Object>) endStep).getSeedSupplier().get(), biOperator::apply)); } else if (endStep instanceof GroupCountStep) { final GroupCountStep.GroupCountBiOperator<Object> biOperator = GroupCountStep.GroupCountBiOperator.instance(); result = nextRDD .mapPartitions(partitions -> { final GroupCountStep<Object, Object> clone = (GroupCountStep) endStep.clone(); - return () -> IteratorUtils.map(partitions, clone::projectTraverser); + return IteratorUtils.map(partitions, clone::projectTraverser); }) .fold(((GroupCountStep<Object, Object>) endStep).getSeedSupplier().get(), biOperator::apply); } else http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/f0c5a5f1/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java index 28a4d55..6735fe5 100644 --- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java +++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java @@ -24,7 +24,7 @@ import org.apache.commons.configuration.BaseConfiguration; import org.apache.commons.configuration.Configuration; import org.apache.spark.SparkConf; import org.apache.spark.api.python.PythonBroadcast; -import org.apache.spark.broadcast.HttpBroadcast; +import org.apache.spark.broadcast.TorrentBroadcast; import org.apache.spark.network.util.ByteUnit; import org.apache.spark.scheduler.CompressedMapStatus; import org.apache.spark.scheduler.HighlyCompressedMapStatus; @@ -48,24 +48,32 @@ import scala.Tuple3; import scala.collection.mutable.WrappedArray; import scala.runtime.BoxedUnit; +import java.io.Serializable; +import java.util.ArrayList; import java.util.Collections; +import java.util.List; /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ -public final class GryoSerializer extends Serializer { +public final class GryoSerializer extends Serializer implements Serializable { //private final Option<String> userRegistrator; private final int bufferSize; private final int maxBufferSize; + private final int poolSize; + private final ArrayList<String> ioRegList = new ArrayList<>(); + private final boolean referenceTracking; + private final boolean registrationRequired; - private final GryoPool gryoPool; + + private transient GryoPool gryoPool; public GryoSerializer(final SparkConf sparkConfiguration) { final long bufferSizeKb = sparkConfiguration.getSizeAsKb("spark.kryoserializer.buffer", "64k"); final long maxBufferSizeMb = sparkConfiguration.getSizeAsMb("spark.kryoserializer.buffer.max", "64m"); - final boolean referenceTracking = sparkConfiguration.getBoolean("spark.kryo.referenceTracking", true); - final boolean registrationRequired = sparkConfiguration.getBoolean("spark.kryo.registrationRequired", false); + referenceTracking = sparkConfiguration.getBoolean("spark.kryo.referenceTracking", true); + registrationRequired = sparkConfiguration.getBoolean("spark.kryo.registrationRequired", false); if (bufferSizeKb >= ByteUnit.GiB.toKiB(2L)) { throw new IllegalArgumentException("spark.kryoserializer.buffer must be less than 2048 mb, got: " + bufferSizeKb + " mb."); } else { @@ -77,9 +85,19 @@ public final class GryoSerializer extends Serializer { //this.userRegistrator = sparkConfiguration.getOption("spark.kryo.registrator"); } } - this.gryoPool = GryoPool.build(). - poolSize(sparkConfiguration.getInt(GryoPool.CONFIG_IO_GRYO_POOL_SIZE, GryoPool.CONFIG_IO_GRYO_POOL_SIZE_DEFAULT)). - ioRegistries(makeApacheConfiguration(sparkConfiguration).getList(GryoPool.CONFIG_IO_REGISTRY, Collections.emptyList())). + poolSize = sparkConfiguration.getInt(GryoPool.CONFIG_IO_GRYO_POOL_SIZE, GryoPool.CONFIG_IO_GRYO_POOL_SIZE_DEFAULT); + List<Object> list = makeApacheConfiguration(sparkConfiguration).getList(GryoPool.CONFIG_IO_REGISTRY, Collections.emptyList()); + list.forEach(c -> { + ioRegList.add(c.toString()); + } + ); + } + + private GryoPool createPool(){ + List<Object> list = new ArrayList<>(ioRegList); + return GryoPool.build(). + poolSize(poolSize). + ioRegistries(list). initializeMapper(builder -> { try { builder.addCustom(Tuple2.class, new Tuple2Serializer()) @@ -91,7 +109,7 @@ public final class GryoSerializer extends Serializer { .addCustom(CompressedMapStatus.class) .addCustom(BlockManagerId.class) .addCustom(HighlyCompressedMapStatus.class, new ExternalizableSerializer()) // externalizable implemented so its okay - .addCustom(HttpBroadcast.class) + .addCustom(TorrentBroadcast.class) .addCustom(PythonBroadcast.class) .addCustom(BoxedUnit.class) .addCustom(Class.forName("scala.reflect.ClassTag$$anon$1"), new JavaSerializer()) @@ -118,6 +136,13 @@ public final class GryoSerializer extends Serializer { } public GryoPool getGryoPool() { + if (gryoPool == null) { + synchronized (this) { + if (gryoPool == null) { + gryoPool = createPool(); + } + } + } return this.gryoPool; }
