Allow DFS paths in `HADOOP_GREMLIN_LIBS`.
Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/5e96f353 Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/5e96f353 Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/5e96f353 Branch: refs/heads/TINKERPOP-1331 Commit: 5e96f353b95205b701fc9663aec87183746badf4 Parents: e790e56 Author: Daniel Kuppitz <daniel_kupp...@hotmail.com> Authored: Thu Jun 9 13:46:34 2016 +0200 Committer: Daniel Kuppitz <daniel_kupp...@hotmail.com> Committed: Mon Jun 13 09:08:09 2016 +0200 ---------------------------------------------------------------------- .../process/computer/GiraphGraphComputer.java | 53 ++++++++------------ .../computer/AbstractHadoopGraphComputer.java | 46 +++++++++++++++++ .../process/computer/SparkGraphComputer.java | 28 ++--------- 3 files changed, 72 insertions(+), 55 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/5e96f353/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java ---------------------------------------------------------------------- diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java index 012b9fc..b06b40a 100644 --- a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java +++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java @@ -61,13 +61,14 @@ import org.apache.tinkerpop.gremlin.util.Gremlin; import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils; import java.io.File; +import java.io.IOException; import java.io.NotSerializableException; +import java.net.URI; import java.util.HashSet; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.concurrent.Future; -import java.util.stream.Stream; /** * @author Marko A. Rodriguez (http://markorodriguez.com) @@ -133,8 +134,7 @@ public final class GiraphGraphComputer extends AbstractHadoopGraphComputer imple final Configuration apacheConfiguration = ConfUtil.makeApacheConfiguration(this.giraphConfiguration); return CompletableFuture.<ComputerResult>supplyAsync(() -> { try { - final FileSystem fs = FileSystem.get(this.giraphConfiguration); - this.loadJars(fs); + this.loadJars(giraphConfiguration); ToolRunner.run(this, new String[]{}); } catch (final Exception e) { //e.printStackTrace(); @@ -247,36 +247,25 @@ public final class GiraphGraphComputer extends AbstractHadoopGraphComputer imple return this.giraphConfiguration; } - private void loadJars(final FileSystem fs) { - final String hadoopGremlinLibsRemote = "hadoop-gremlin-" + Gremlin.version() + "-libs"; - if (this.giraphConfiguration.getBoolean(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, true)) { - final String hadoopGremlinLibsLocal = null == System.getProperty(Constants.HADOOP_GREMLIN_LIBS) ? System.getenv(Constants.HADOOP_GREMLIN_LIBS) : System.getProperty(Constants.HADOOP_GREMLIN_LIBS); - if (null == hadoopGremlinLibsLocal) - this.logger.warn(Constants.HADOOP_GREMLIN_LIBS + " is not set -- proceeding regardless"); - else { - final String[] paths = hadoopGremlinLibsLocal.split(":"); - for (final String path : paths) { - final File file = AbstractHadoopGraphComputer.copyDirectoryIfNonExistent(fs, path); - if (file.exists()) { - Stream.of(file.listFiles()).filter(f -> f.getName().endsWith(Constants.DOT_JAR)).forEach(f -> { - try { - final Path jarFile = new Path(fs.getHomeDirectory() + "/" + hadoopGremlinLibsRemote + "/" + f.getName()); - if (!fs.exists(jarFile)) - fs.copyFromLocalFile(new Path(f.getPath()), jarFile); - try { - DistributedCache.addArchiveToClassPath(jarFile, this.giraphConfiguration, fs); - } catch (final Exception e) { - throw new RuntimeException(e.getMessage(), e); - } - } catch (final Exception e) { - throw new IllegalStateException(e.getMessage(), e); - } - }); - } else { - this.logger.warn(path + " does not reference a valid directory -- proceeding regardless"); - } - } + @Override + protected void loadJar(final org.apache.hadoop.conf.Configuration hadoopConfiguration, final File file, final Object... params) + throws IOException { + final FileSystem defaultFileSystem = FileSystem.get(hadoopConfiguration); + try { + final Path jarFile = new Path(defaultFileSystem.getHomeDirectory() + "/hadoop-gremlin-" + Gremlin.version() + "-libs/" + file.getName()); + if (!defaultFileSystem.exists(jarFile)) { + final Path sourcePath = new Path(file.getPath()); + final URI sourceUri = sourcePath.toUri(); + final FileSystem fs = FileSystem.get(sourceUri, hadoopConfiguration); + fs.copyFromLocalFile(sourcePath, jarFile); } + try { + DistributedCache.addArchiveToClassPath(jarFile, this.giraphConfiguration, defaultFileSystem); + } catch (final Exception e) { + throw new RuntimeException(e.getMessage(), e); + } + } catch (final Exception e) { + throw new IllegalStateException(e.getMessage(), e); } } http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/5e96f353/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/AbstractHadoopGraphComputer.java ---------------------------------------------------------------------- diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/AbstractHadoopGraphComputer.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/AbstractHadoopGraphComputer.java index a05a1be..f5f332d 100644 --- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/AbstractHadoopGraphComputer.java +++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/AbstractHadoopGraphComputer.java @@ -18,6 +18,7 @@ */ package org.apache.tinkerpop.gremlin.hadoop.process.computer; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; @@ -41,15 +42,22 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; import java.util.HashSet; import java.util.Optional; import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Stream; /** * @author Marko A. Rodriguez (http://markorodriguez.com) */ public abstract class AbstractHadoopGraphComputer implements GraphComputer { + private final static Pattern PATH_PATTERN = Pattern.compile("([^:]|://)+"); + protected final Logger logger; protected final HadoopGraph hadoopGraph; protected boolean executed = false; @@ -139,6 +147,44 @@ public abstract class AbstractHadoopGraphComputer implements GraphComputer { throw GraphComputer.Exceptions.computerRequiresMoreWorkersThanSupported(this.workers, this.features().getMaxWorkers()); } + protected void loadJars(final Configuration hadoopConfiguration, final Object... params) { + if (hadoopConfiguration.getBoolean(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, true)) { + final String hadoopGremlinLibs = null == System.getProperty(Constants.HADOOP_GREMLIN_LIBS) ? System.getenv(Constants.HADOOP_GREMLIN_LIBS) : System.getProperty(Constants.HADOOP_GREMLIN_LIBS); + if (null == hadoopGremlinLibs) + this.logger.warn(Constants.HADOOP_GREMLIN_LIBS + " is not set -- proceeding regardless"); + else { + try { + final Matcher matcher = PATH_PATTERN.matcher(hadoopGremlinLibs); + while (matcher.find()) { + final String path = matcher.group(); + FileSystem fs; + try { + final URI uri = new URI(path); + fs = FileSystem.get(uri, hadoopConfiguration); + } catch (URISyntaxException e) { + fs = FileSystem.get(hadoopConfiguration); + } + final File file = AbstractHadoopGraphComputer.copyDirectoryIfNonExistent(fs, path); + if (file.exists()) { + for (final File f : file.listFiles()) { + if (f.getName().endsWith(Constants.DOT_JAR)) { + loadJar(hadoopConfiguration, f, params); + } + } + } + else + this.logger.warn(path + " does not reference a valid directory -- proceeding regardless"); + } + } catch (IOException e) { + throw new IllegalStateException(e.getMessage(), e); + } + } + } + } + + protected abstract void loadJar(final Configuration hadoopConfiguration, final File file, final Object... params) + throws IOException; + @Override public Features features() { return new Features(); http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/5e96f353/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java ---------------------------------------------------------------------- diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java index 5178225..40598c0 100644 --- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java +++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java @@ -78,7 +78,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ThreadFactory; -import java.util.stream.Stream; /** * @author Marko A. Rodriguez (http://markorodriguez.com) @@ -209,7 +208,7 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer { // execute the vertex program and map reducers and if there is a failure, auto-close the spark context try { final JavaSparkContext sparkContext = new JavaSparkContext(SparkContext.getOrCreate(sparkConfiguration)); - this.loadJars(sparkContext, hadoopConfiguration); // add the project jars to the cluster + this.loadJars(hadoopConfiguration, sparkContext); // add the project jars to the cluster Spark.create(sparkContext.sc()); // this is the context RDD holder that prevents GC updateLocalConfiguration(sparkContext, sparkConfiguration); // create a message-passing friendly rdd from the input rdd @@ -384,27 +383,10 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer { ///////////////// - private void loadJars(final JavaSparkContext sparkContext, final Configuration hadoopConfiguration) { - if (hadoopConfiguration.getBoolean(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, true)) { - final String hadoopGremlinLocalLibs = null == System.getProperty(Constants.HADOOP_GREMLIN_LIBS) ? System.getenv(Constants.HADOOP_GREMLIN_LIBS) : System.getProperty(Constants.HADOOP_GREMLIN_LIBS); - if (null == hadoopGremlinLocalLibs) - this.logger.warn(Constants.HADOOP_GREMLIN_LIBS + " is not set -- proceeding regardless"); - else { - try { - final String[] paths = hadoopGremlinLocalLibs.split(":"); - final FileSystem fs = FileSystem.get(hadoopConfiguration); - for (final String path : paths) { - final File file = AbstractHadoopGraphComputer.copyDirectoryIfNonExistent(fs, path); - if (file.exists()) - Stream.of(file.listFiles()).filter(f -> f.getName().endsWith(Constants.DOT_JAR)).forEach(f -> sparkContext.addJar(f.getAbsolutePath())); - else - this.logger.warn(path + " does not reference a valid directory -- proceeding regardless"); - } - } catch (IOException e) { - throw new IllegalStateException(e.getMessage(), e); - } - } - } + @Override + protected void loadJar(final Configuration hadoopConfiguration, final File file, final Object... params) { + final JavaSparkContext sparkContext = (JavaSparkContext) params[0]; + sparkContext.addJar(file.getAbsolutePath()); } /**