Merge branch 'tp32' into tp33 Conflicts: spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java
Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/3891777e Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/3891777e Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/3891777e Branch: refs/heads/tp33 Commit: 3891777e4b30665bd47a5ead9e50871f37f7e9d8 Parents: a708cc3 bd85e5f Author: Stephen Mallette <sp...@genoprime.com> Authored: Tue May 22 07:08:22 2018 -0400 Committer: Stephen Mallette <sp...@genoprime.com> Committed: Tue May 22 07:08:22 2018 -0400 ---------------------------------------------------------------------- CHANGELOG.asciidoc | 1 + .../process/computer/SparkGraphComputer.java | 104 ++++++++++++++++--- 2 files changed, 93 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/3891777e/CHANGELOG.asciidoc ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/3891777e/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java ---------------------------------------------------------------------- diff --cc spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java index dafe613,4c896cd..5184db6 --- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java +++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java @@@ -33,9 -33,7 +33,9 @@@ import org.apache.spark.Partitioner import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.launcher.SparkLauncher; ++import org.apache.spark.serializer.KryoRegistrator; +import org.apache.spark.serializer.KryoSerializer; + import org.apache.spark.serializer.Serializer; import org.apache.spark.storage.StorageLevel; import org.apache.tinkerpop.gremlin.hadoop.Constants; import org.apache.tinkerpop.gremlin.hadoop.process.computer.AbstractHadoopGraphComputer; @@@ -87,7 -78,16 +87,17 @@@ import java.util.concurrent.Executors import java.util.concurrent.Future; import java.util.concurrent.ThreadFactory; + import static org.apache.tinkerpop.gremlin.hadoop.Constants.GREMLIN_SPARK_GRAPH_STORAGE_LEVEL; + import static org.apache.tinkerpop.gremlin.hadoop.Constants.GREMLIN_SPARK_PERSIST_CONTEXT; + import static org.apache.tinkerpop.gremlin.hadoop.Constants.GREMLIN_SPARK_PERSIST_STORAGE_LEVEL; + import static org.apache.tinkerpop.gremlin.hadoop.Constants.GREMLIN_SPARK_SKIP_GRAPH_CACHE; + import static org.apache.tinkerpop.gremlin.hadoop.Constants.GREMLIN_SPARK_SKIP_PARTITIONER; ++import static org.apache.tinkerpop.gremlin.hadoop.Constants.SPARK_KRYO_REGISTRATION_REQUIRED; + import static org.apache.tinkerpop.gremlin.hadoop.Constants.SPARK_SERIALIZER; + /** + * {@link GraphComputer} implementation for Apache Spark. + * * @author Marko A. Rodriguez (http://markorodriguez.com) */ public final class SparkGraphComputer extends AbstractHadoopGraphComputer { @@@ -116,10 -112,15 +126,14 @@@ public SparkGraphComputer(final HadoopGraph hadoopGraph) { super(hadoopGraph); this.sparkConfiguration = new HadoopConfiguration(); - ConfigurationUtils.copy(this.hadoopGraph.configuration(), this.sparkConfiguration); } + /** + * Sets the number of workers. If the {@code spark.master} configuration is configured with "local" then it will + * change that configuration to use the specified number of worker threads. + */ @Override - public GraphComputer workers(final int workers) { + public SparkGraphComputer workers(final int workers) { super.workers(workers); if (this.sparkConfiguration.containsKey(SparkLauncher.SPARK_MASTER) && this.sparkConfiguration.getString(SparkLauncher.SPARK_MASTER).startsWith("local")) { this.sparkConfiguration.setProperty(SparkLauncher.SPARK_MASTER, "local[" + this.workers + "]"); @@@ -134,6 -135,56 +148,72 @@@ return this; } + /** + * Sets the configuration option for {@code spark.master} which is the cluster manager to connect to which may be + * one of the <a href="https://spark.apache.org/docs/latest/submitting-applications.html#master-urls">allowed master URLs</a>. + */ + public SparkGraphComputer master(final String clusterManager) { + return configure(SparkLauncher.SPARK_MASTER, clusterManager); + } + + /** + * Determines if the Spark context should be left open preventing Spark from garbage collecting unreferenced RDDs. + */ + public SparkGraphComputer persistContext(final boolean persist) { + return configure(GREMLIN_SPARK_PERSIST_CONTEXT, persist); + } + + /** + * Specifies the method by which the {@link VertexProgram} created graph is persisted. By default, it is configured + * to use {@code StorageLevel#MEMORY_ONLY()} + */ + public SparkGraphComputer graphStorageLevel(final StorageLevel storageLevel) { + return configure(GREMLIN_SPARK_GRAPH_STORAGE_LEVEL, storageLevel.description()); + } + + public SparkGraphComputer persistStorageLevel(final StorageLevel storageLevel) { + return configure(GREMLIN_SPARK_PERSIST_STORAGE_LEVEL, storageLevel.description()); + } + + /** + * Determines if the graph RDD should be partitioned or not. By default, this value is {@code false}. + */ + public SparkGraphComputer skipPartitioner(final boolean skip) { + return configure(GREMLIN_SPARK_SKIP_PARTITIONER, skip); + } + + /** + * Determines if the graph RDD should be cached or not. If {@code true} then + * {@link #graphStorageLevel(StorageLevel)} is ignored. By default, this value is {@code false}. + */ + public SparkGraphComputer skipGraphCache(final boolean skip) { + return configure(GREMLIN_SPARK_SKIP_GRAPH_CACHE, skip); + } + + /** + * Specifies the {@code org.apache.spark.serializer.Serializer} implementation to use. By default, this value is - * set to {@link GryoSerializer}. ++ * set to {@code org.apache.spark.serializer.KryoSerializer}. + */ + public SparkGraphComputer serializer(final Class<? extends Serializer> serializer) { + return configure(SPARK_SERIALIZER, serializer.getCanonicalName()); + } + ++ /** ++ * Specifies the {@code org.apache.spark.serializer.KryoRegistrator} to use to install additional types. By ++ * default this value is set to TinkerPop's {@link GryoRegistrator}. ++ */ ++ public SparkGraphComputer sparkKryoRegistrator(final Class<? extends KryoRegistrator> registrator) { ++ return configure(Constants.SPARK_KRYO_REGISTRATOR, registrator.getCanonicalName()); ++ } ++ ++ /** ++ * Determines if kryo registration is required such that attempts to serialize classes that are not registered ++ * will result in an error. By default this value is {@code false}. ++ */ ++ public SparkGraphComputer kryoRegistrationRequired(final boolean required) { ++ return configure(SPARK_KRYO_REGISTRATION_REQUIRED, required); ++ } ++ @Override public Future<ComputerResult> submit() { this.validateStatePriorToExecution();