Updated Branches: refs/heads/trunk 4b01c88e4 -> efe6bf3d6
GIRAPH-709: More flexible Jython script loading (nitay) Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/efe6bf3d Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/efe6bf3d Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/efe6bf3d Branch: refs/heads/trunk Commit: efe6bf3d6ece4bedfd0ab0ba649bcc558d1b0c80 Parents: 4b01c88 Author: Nitay Joffe <[email protected]> Authored: Wed Jul 10 07:52:47 2013 -0400 Committer: Nitay Joffe <[email protected]> Committed: Wed Jul 10 08:07:19 2013 -0400 ---------------------------------------------------------------------- CHANGELOG | 2 + .../giraph/benchmark/PageRankBenchmark.java | 15 +- .../giraph/conf/JsonStringConfOption.java | 151 +++++++++++ .../apache/giraph/graph/GraphTaskManager.java | 7 +- .../org/apache/giraph/jython/DeployType.java | 28 -- .../giraph/jython/JythonComputationFactory.java | 131 +++------- .../org/apache/giraph/jython/JythonUtils.java | 38 ++- .../org/apache/giraph/scripting/DeployType.java | 28 ++ .../apache/giraph/scripting/DeployedScript.java | 96 +++++++ .../apache/giraph/scripting/ScriptLoader.java | 253 +++++++++++++++++++ .../apache/giraph/scripting/package-info.java | 21 ++ .../apache/giraph/utils/ConfigurationUtils.java | 8 +- .../org/apache/giraph/jython/TestJython.java | 13 +- 13 files changed, 636 insertions(+), 155 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/efe6bf3d/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index 6b792b8..b1f44c2 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,8 @@ Giraph Change Log Release 1.1.0 - unreleased + GIRAPH-709: More flexible Jython script loading (nitay) + GIRAPH-708: Factories for creation of all IVEM types (nitay) GIRAPH-710: Define zookeeper version in a property to allow build time http://git-wip-us.apache.org/repos/asf/giraph/blob/efe6bf3d/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java b/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java index 413107d..acc1c46 100644 --- a/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java +++ b/giraph-core/src/main/java/org/apache/giraph/benchmark/PageRankBenchmark.java @@ -23,9 +23,11 @@ import org.apache.giraph.combiner.FloatSumCombiner; import org.apache.giraph.conf.GiraphConfiguration; import org.apache.giraph.conf.GiraphTypes; import org.apache.giraph.edge.IntNullArrayEdges; +import org.apache.giraph.graph.Language; import org.apache.giraph.io.formats.PseudoRandomInputFormatConstants; import org.apache.giraph.io.formats.PseudoRandomIntNullVertexInputFormat; -import org.apache.giraph.jython.DeployType; +import org.apache.giraph.scripting.DeployType; +import org.apache.giraph.scripting.ScriptLoader; import org.apache.giraph.jython.JythonUtils; import org.apache.giraph.utils.DistributedCacheUtils; import org.apache.giraph.utils.ReflectionUtils; @@ -36,8 +38,6 @@ import com.google.common.collect.Sets; import java.util.Set; -import static org.apache.giraph.jython.JythonComputationFactory.JYTHON_DEPLOY_TYPE; - /** * Benchmark for {@link PageRankComputation} */ @@ -56,19 +56,22 @@ public class PageRankBenchmark extends GiraphBenchmark { if (BenchmarkOption.JYTHON.optionTurnedOn(cmd)) { GiraphTypes types = new GiraphTypes(); types.inferFrom(PageRankComputation.class); + String script; + DeployType deployType; if (BenchmarkOption.SCRIPT_PATH.optionTurnedOn(cmd)) { - JYTHON_DEPLOY_TYPE.set(conf, DeployType.DISTRIBUTED_CACHE); + deployType = DeployType.DISTRIBUTED_CACHE; String path = BenchmarkOption.SCRIPT_PATH.getOptionValue(cmd); Path hadoopPath = new Path(path); Path remotePath = DistributedCacheUtils.copyAndAdd(hadoopPath, conf); script = remotePath.toString(); } else { - JYTHON_DEPLOY_TYPE.set(conf, DeployType.RESOURCE); + deployType = DeployType.RESOURCE; script = ReflectionUtils.getPackagePath(this) + "/page-rank.py"; } + ScriptLoader.setScriptsToLoad(conf, script, deployType, Language.JYTHON); types.writeIfUnset(conf); - JythonUtils.init(conf, script, "PageRank"); + JythonUtils.init(conf, "PageRank"); } else { conf.setComputationClass(PageRankComputation.class); } http://git-wip-us.apache.org/repos/asf/giraph/blob/efe6bf3d/giraph-core/src/main/java/org/apache/giraph/conf/JsonStringConfOption.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/JsonStringConfOption.java b/giraph-core/src/main/java/org/apache/giraph/conf/JsonStringConfOption.java new file mode 100644 index 0000000..28f9388 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/conf/JsonStringConfOption.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.conf; + +import org.apache.hadoop.conf.Configuration; +import org.apache.log4j.Logger; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.type.TypeReference; + +import java.io.IOException; + +/** + * JSON String configuration option + */ +public class JsonStringConfOption extends AbstractConfOption { + /** Logger */ + private static final Logger LOG = + Logger.getLogger(JsonStringConfOption.class); + + /** + * Constructor + * + * @param key String key name + * @param description String description of option + */ + public JsonStringConfOption(String key, String description) { + super(key, description); + } + + /** + * Set JSON value + * + * @param conf Configuration + * @param value Json value + */ + public void set(Configuration conf, Object value) { + ObjectMapper mapper = new ObjectMapper(); + String jsonStr; + try { + jsonStr = mapper.writeValueAsString(value); + conf.set(getKey(), jsonStr); + } catch (IOException e) { + throw new IllegalStateException("Failed to set " + getKey() + + " with json value from " + value); + } + } + + /** + * Get raw JSON string + * + * @param conf Configuration + * @return raw JSON string value + */ + public String getRaw(Configuration conf) { + return conf.get(getKey()); + } + + /** + * Get JSON value + * + * @param <T> JSON type + * @param conf Configuration + * @param klass Class to read into + * @return JSON value + */ + public <T> T get(Configuration conf, Class<T> klass) { + String jsonStr = getRaw(conf); + T value = null; + if (jsonStr != null) { + ObjectMapper mapper = new ObjectMapper(); + try { + value = mapper.readValue(jsonStr, klass); + } catch (IOException e) { + throw new IllegalStateException("Failed to read json from key " + + getKey() + " with class " + klass); + } + } + return value; + } + + /** + * Get JSON value + * + * @param <T> JSON type + * @param conf Configuration + * @param typeReference TypeReference for JSON type + * @return JSON value + */ + public <T> T get(Configuration conf, TypeReference<T> typeReference) { + String jsonStr = getRaw(conf); + T value = null; + if (jsonStr != null) { + ObjectMapper mapper = new ObjectMapper(); + try { + value = mapper.readValue(jsonStr, typeReference); + } catch (IOException e) { + throw new IllegalStateException("Failed to read json from key " + + getKey() + " with class " + typeReference); + } + } + return value; + } + + /** + * Get JSON value, or default if not present + * + * @param <T> JSON type + * @param klass Class to read into + * @param conf Configuration + * @param defaultValue Default value if not found + * @return JSON value + */ + public <T> T getWithDefault(Configuration conf, Class<T> klass, + T defaultValue) { + if (contains(conf)) { + return get(conf, klass); + } else { + return defaultValue; + } + } + + @Override + public String getDefaultValueStr() { + return "null"; + } + + @Override + public boolean isDefaultValue(Configuration conf) { + return !contains(conf); + } + + @Override + public ConfOptionType getType() { + return ConfOptionType.STRING; + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/efe6bf3d/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java index e81c7c4..b0982b3 100644 --- a/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java +++ b/giraph-core/src/main/java/org/apache/giraph/graph/GraphTaskManager.java @@ -24,6 +24,7 @@ import org.apache.giraph.bsp.CentralizedServiceWorker; import org.apache.giraph.comm.messages.MessageStore; import org.apache.giraph.conf.GiraphConstants; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.scripting.ScriptLoader; import org.apache.giraph.master.BspServiceMaster; import org.apache.giraph.master.MasterAggregatorUsage; import org.apache.giraph.master.MasterThread; @@ -190,10 +191,12 @@ public class GraphTaskManager<I extends WritableComparable, V extends Writable, initializeAndConfigureLogging(); // init the metrics objects setupAndInitializeGiraphMetrics(); - // One time setup for computation factory - conf.createComputationFactory().initialize(conf); // Check input checkInput(); + // Load any scripts that were deployed + ScriptLoader.loadScripts(conf); + // One time setup for computation factory + conf.createComputationFactory().initialize(conf); // Do some task setup (possibly starting up a Zookeeper service) context.setStatus("setup: Initializing Zookeeper services."); locateZookeeperClasspath(zkPathList); http://git-wip-us.apache.org/repos/asf/giraph/blob/efe6bf3d/giraph-core/src/main/java/org/apache/giraph/jython/DeployType.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/jython/DeployType.java b/giraph-core/src/main/java/org/apache/giraph/jython/DeployType.java deleted file mode 100644 index d916119..0000000 --- a/giraph-core/src/main/java/org/apache/giraph/jython/DeployType.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.giraph.jython; - -/** - * Type of deployment for a file - */ -public enum DeployType { - /** Resource packaged with jar */ - RESOURCE, - /** Hadoop's Distributed Cache */ - DISTRIBUTED_CACHE -} http://git-wip-us.apache.org/repos/asf/giraph/blob/efe6bf3d/giraph-core/src/main/java/org/apache/giraph/jython/JythonComputationFactory.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/jython/JythonComputationFactory.java b/giraph-core/src/main/java/org/apache/giraph/jython/JythonComputationFactory.java index b714e91..80e4e76 100644 --- a/giraph-core/src/main/java/org/apache/giraph/jython/JythonComputationFactory.java +++ b/giraph-core/src/main/java/org/apache/giraph/jython/JythonComputationFactory.java @@ -17,123 +17,70 @@ */ package org.apache.giraph.jython; -import org.apache.giraph.conf.EnumConfOption; import org.apache.giraph.conf.GiraphConfiguration; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.conf.StrConfOption; -import org.apache.giraph.graph.Computation; import org.apache.giraph.factories.ComputationFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; +import org.apache.giraph.graph.Computation; +import org.apache.giraph.scripting.ScriptLoader; import org.apache.log4j.Logger; import org.python.core.PyObject; import org.python.util.PythonInterpreter; -import com.google.common.base.Optional; -import com.google.common.base.Preconditions; -import com.google.common.io.Closeables; - -import java.io.BufferedInputStream; -import java.io.FileInputStream; -import java.io.IOException; -import java.io.InputStream; - -import static org.apache.giraph.utils.DistributedCacheUtils.getLocalCacheFile; +import static com.google.common.base.Preconditions.checkNotNull; +import static org.apache.giraph.scripting.ScriptLoader.SCRIPTS_TO_LOAD; /** * Factory for creating Jython Computation from python scripts */ public class JythonComputationFactory implements ComputationFactory { - /** Type of script path */ - public static final EnumConfOption<DeployType> JYTHON_DEPLOY_TYPE = - EnumConfOption.create("giraph.jython.deploy.type", - DeployType.class, DeployType.DISTRIBUTED_CACHE, - "Type of script path"); - /** Path to Jython script */ - public static final StrConfOption JYTHON_SCRIPT_PATH = - new StrConfOption("giraph.jython.path", "_script_not_set_", - "Path to Jython script"); /** Name of Computation class in Jython script */ - public static final StrConfOption JYTHON_COMPUTATION_CLASS = + public static final StrConfOption JYTHON_COMPUTATION_CLASS_NAME = new StrConfOption("giraph.jython.class", "_computation_class_not_set_", "Name of Computation class in Jython script"); + /** The Jython compute function, cached here for fast access */ + private static volatile PyObject JYTHON_COMPUTATION_MODULE; + /** Logger */ private static final Logger LOG = Logger.getLogger(JythonUtils.class); - @Override - public void initialize(ImmutableClassesGiraphConfiguration conf) { - String scriptPath = JYTHON_SCRIPT_PATH.get(conf); - InputStream pythonStream = getPythonScriptStream(conf, scriptPath); - try { - PythonInterpreter interpreter = new PythonInterpreter(); - if (LOG.isInfoEnabled()) { - LOG.info("initComputation: Jython loading script " + scriptPath); - } - interpreter.execfile(pythonStream); - - String className = computationName(conf); - PyObject pyComputationModule = interpreter.get(className); - - JythonUtils.setPythonComputationModule(pyComputationModule); - } finally { - Closeables.closeQuietly(pythonStream); - } + /** + * Set static python computation module stored + * + * @param mod python computation module + */ + private static void setPythonComputationModule(PyObject mod) { + JYTHON_COMPUTATION_MODULE = mod; } /** - * Get an {@link InputStream} for the jython script. + * Get python computation module stored * - * @param conf Configuration - * @param path script path - * @return {@link InputStream} for reading script + * @return python computation module */ - private InputStream getPythonScriptStream(Configuration conf, - String path) { - InputStream stream = null; - DeployType deployType = JYTHON_DEPLOY_TYPE.get(conf); - switch (deployType) { - case RESOURCE: - if (LOG.isInfoEnabled()) { - LOG.info("getPythonScriptStream: Reading Jython Computation " + - "from resource at " + path); - } - stream = getClass().getResourceAsStream(path); - if (stream == null) { - throw new IllegalStateException("getPythonScriptStream: Failed to " + - "open Jython script from resource at " + path); - } - break; - case DISTRIBUTED_CACHE: - if (LOG.isInfoEnabled()) { - LOG.info("getPythonScriptStream: Reading Jython Computation " + - "from DistributedCache at " + path); - } - Optional<Path> localPath = getLocalCacheFile(conf, path); - if (!localPath.isPresent()) { - throw new IllegalStateException("getPythonScriptStream: Failed to " + - "find Jython script in local DistributedCache matching " + path); - } - String pathStr = localPath.get().toString(); - try { - stream = new BufferedInputStream(new FileInputStream(pathStr)); - } catch (IOException e) { - throw new IllegalStateException("getPythonScriptStream: Failed open " + - "Jython script from DistributedCache at " + localPath); - } - break; - default: - throw new IllegalArgumentException("getPythonScriptStream: Unknown " + - "Jython script deployment type: " + deployType); - } - return stream; + private static PyObject getPythonComputationModule() { + return JYTHON_COMPUTATION_MODULE; + } + + @Override + public void initialize(ImmutableClassesGiraphConfiguration conf) { + PythonInterpreter interpreter = JythonUtils.getInterpreter(); + String className = computationName(conf); + PyObject pyComputationModule = interpreter.get(className); + checkNotNull(pyComputationModule, + "Could not find Jython Computation class " + className + + " in loaded scripts: " + ScriptLoader.getLoadedScripts()); + setPythonComputationModule(pyComputationModule); } @Override public Computation createComputation( ImmutableClassesGiraphConfiguration conf) { - PyObject pyComputationModule = JythonUtils.getPythonComputationModule(); - Preconditions.checkNotNull(pyComputationModule); + PyObject pyComputationModule = getPythonComputationModule(); + checkNotNull(pyComputationModule, + "Jython Computation class not set in loaded scripts: " + + ScriptLoader.getLoadedScripts()); PyObject pyComputationObj = pyComputationModule.__call__(); Object computationObj = pyComputationObj.__tojava__(Computation.class); @@ -148,18 +95,18 @@ public class JythonComputationFactory implements ComputationFactory { @Override public void checkConfiguration(ImmutableClassesGiraphConfiguration conf) { - if (JYTHON_SCRIPT_PATH.isDefaultValue(conf)) { + if (SCRIPTS_TO_LOAD.isDefaultValue(conf)) { throw new IllegalStateException("checkConfiguration: " + - JYTHON_SCRIPT_PATH.getKey() + " not set in configuration"); + SCRIPTS_TO_LOAD.getKey() + " not set in configuration"); } - if (JYTHON_COMPUTATION_CLASS.isDefaultValue(conf)) { + if (JYTHON_COMPUTATION_CLASS_NAME.isDefaultValue(conf)) { throw new IllegalStateException("checkConfiguration: " + - JYTHON_COMPUTATION_CLASS.getKey() + " not set in configuration"); + JYTHON_COMPUTATION_CLASS_NAME.getKey() + " not set in configuration"); } } @Override public String computationName(GiraphConfiguration conf) { - return JYTHON_COMPUTATION_CLASS.get(conf); + return JYTHON_COMPUTATION_CLASS_NAME.get(conf); } } http://git-wip-us.apache.org/repos/asf/giraph/blob/efe6bf3d/giraph-core/src/main/java/org/apache/giraph/jython/JythonUtils.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/jython/JythonUtils.java b/giraph-core/src/main/java/org/apache/giraph/jython/JythonUtils.java index 77040e3..e747e84 100644 --- a/giraph-core/src/main/java/org/apache/giraph/jython/JythonUtils.java +++ b/giraph-core/src/main/java/org/apache/giraph/jython/JythonUtils.java @@ -19,53 +19,45 @@ package org.apache.giraph.jython; import org.apache.giraph.graph.Language; import org.apache.hadoop.conf.Configuration; -import org.python.core.PyObject; +import org.python.util.PythonInterpreter; import static org.apache.giraph.conf.GiraphConstants.COMPUTATION_FACTORY_CLASS; import static org.apache.giraph.conf.GiraphConstants.COMPUTATION_LANGUAGE; -import static org.apache.giraph.jython.JythonComputationFactory.JYTHON_COMPUTATION_CLASS; -import static org.apache.giraph.jython.JythonComputationFactory.JYTHON_SCRIPT_PATH; +import static org.apache.giraph.jython.JythonComputationFactory.JYTHON_COMPUTATION_CLASS_NAME; /** * Helpers for running jobs with Jython. */ public class JythonUtils { - /** The Jython compute function, cached here for fast access */ - private static volatile PyObject JYTHON_COMPUTATION_MODULE; + /** + * The Jython interpreter. Cached here for fast access. We use a singleton + * for this so that we can parse all of the Jython scripts once at startup + * and then have their data loaded for the rest of the job. + */ + private static final PythonInterpreter INTERPRETER = + new PythonInterpreter(); /** Don't construct */ private JythonUtils() { } /** - * Set static python computation module stored - * - * @param mod python computation module - */ - public static void setPythonComputationModule(PyObject mod) { - JYTHON_COMPUTATION_MODULE = mod; - } - - /** - * Get python computation module stored + * Get Jython interpreter * - * @return python computation module + * @return interpreter */ - public static PyObject getPythonComputationModule() { - return JYTHON_COMPUTATION_MODULE; + public static PythonInterpreter getInterpreter() { + return INTERPRETER; } /** * Sets up the Configuration for using Jython * * @param conf Configuration to se - * @param scriptPath Path to Jython script (resource or distributed cache) * @param klassName Class name of Jython Computation */ - public static void init(Configuration conf, String scriptPath, - String klassName) { + public static void init(Configuration conf, String klassName) { COMPUTATION_LANGUAGE.set(conf, Language.JYTHON); COMPUTATION_FACTORY_CLASS.set(conf, JythonComputationFactory.class); - JYTHON_SCRIPT_PATH.set(conf, scriptPath); - JYTHON_COMPUTATION_CLASS.set(conf, klassName); + JYTHON_COMPUTATION_CLASS_NAME.set(conf, klassName); } } http://git-wip-us.apache.org/repos/asf/giraph/blob/efe6bf3d/giraph-core/src/main/java/org/apache/giraph/scripting/DeployType.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/scripting/DeployType.java b/giraph-core/src/main/java/org/apache/giraph/scripting/DeployType.java new file mode 100644 index 0000000..fc3054a --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/scripting/DeployType.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.scripting; + +/** + * Type of deployment for a file + */ +public enum DeployType { + /** Resource packaged with jar */ + RESOURCE, + /** Hadoop's Distributed Cache */ + DISTRIBUTED_CACHE +} http://git-wip-us.apache.org/repos/asf/giraph/blob/efe6bf3d/giraph-core/src/main/java/org/apache/giraph/scripting/DeployedScript.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/scripting/DeployedScript.java b/giraph-core/src/main/java/org/apache/giraph/scripting/DeployedScript.java new file mode 100644 index 0000000..c4353d3 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/scripting/DeployedScript.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.scripting; + +import org.apache.giraph.graph.Language; +import org.codehaus.jackson.annotate.JsonCreator; +import org.codehaus.jackson.annotate.JsonProperty; + +import com.google.common.base.Objects; + +/** + * A script that was deployed to the cluster. + */ +public class DeployedScript { + /** How the script was deployed */ + @JsonProperty + private final DeployType deployType; + /** Path to the script */ + @JsonProperty + private final String path; + /** Programming language the script is written in */ + @JsonProperty + private final Language language; + + /** + * Constructor + * + * @param path String path to resource + * @param deployType deployment type + * @param language programming language + */ + @JsonCreator + public DeployedScript( + @JsonProperty("path") String path, + @JsonProperty("deployType") DeployType deployType, + @JsonProperty("language") Language language) { + this.path = path; + this.deployType = deployType; + this.language = language; + } + + public DeployType getDeployType() { + return deployType; + } + + public String getPath() { + return path; + } + + public Language getLanguage() { + return language; + } + + @Override + public int hashCode() { + return Objects.hashCode(path, deployType, language); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj instanceof DeployedScript) { + DeployedScript other = (DeployedScript) obj; + return Objects.equal(path, other.path) && + Objects.equal(deployType, other.deployType) && + Objects.equal(language, other.language); + } + return false; + } + + @Override + public String toString() { + return Objects.toStringHelper(this) + .add("path", path) + .add("deployType", deployType) + .add("language", language) + .toString(); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/efe6bf3d/giraph-core/src/main/java/org/apache/giraph/scripting/ScriptLoader.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/scripting/ScriptLoader.java b/giraph-core/src/main/java/org/apache/giraph/scripting/ScriptLoader.java new file mode 100644 index 0000000..bf3e152 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/scripting/ScriptLoader.java @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.scripting; + +import org.apache.giraph.conf.JsonStringConfOption; +import org.apache.giraph.graph.Language; +import org.apache.giraph.jython.JythonUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.log4j.Logger; +import org.codehaus.jackson.type.TypeReference; + +import com.google.common.base.Optional; +import com.google.common.collect.Lists; +import com.google.common.io.Closeables; + +import java.io.BufferedInputStream; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.List; + +import static org.apache.giraph.utils.DistributedCacheUtils.getLocalCacheFile; + +/** + * Loads scripts written by user in other languages, for example Jython. + */ +public class ScriptLoader { + /** Option for scripts to load on workers */ + public static final JsonStringConfOption SCRIPTS_TO_LOAD = + new JsonStringConfOption("giraph.scripts.to.load", + "Scripts to load on workers"); + + /** Scripts that were loaded */ + private static final List<DeployedScript> LOADED_SCRIPTS = + Lists.newArrayList(); + + /** Logger */ + private static final Logger LOG = Logger.getLogger(ScriptLoader.class); + + /** Don't construct */ + private ScriptLoader() { } + + /** + * Deploy a script + * + * @param conf {@link Configuration} + * @param scriptPath Path to script + * @param deployType type of deployment + * @param language programming language + */ + public static void setScriptsToLoad(Configuration conf, + String scriptPath, DeployType deployType, Language language) { + DeployedScript deployedScript = new DeployedScript(scriptPath, + deployType, language); + setScriptsToLoad(conf, deployedScript); + } + + /** + * Deploy pair of scripts + * + * @param conf {@link Configuration} + * @param script1 Path to script + * @param deployType1 type of deployment + * @param language1 programming language + * @param script2 Path to script + * @param deployType2 type of deployment + * @param language2 programming language + */ + public static void setScriptsToLoad(Configuration conf, + String script1, DeployType deployType1, Language language1, + String script2, DeployType deployType2, Language language2) { + DeployedScript deployedScript1 = new DeployedScript(script1, + deployType1, language1); + DeployedScript deployedScript2 = new DeployedScript(script2, + deployType2, language2); + setScriptsToLoad(conf, deployedScript1, deployedScript2); + } + + /** + * Deploy scripts + * + * @param conf Configuration + * @param scripts the scripts to deploy + */ + public static void setScriptsToLoad(Configuration conf, + DeployedScript... scripts) { + List<DeployedScript> scriptsToLoad = Lists.newArrayList(scripts); + SCRIPTS_TO_LOAD.set(conf, scriptsToLoad); + } + + /** + * Add a script to load on workers + * + * @param conf {@link Configuration} + * @param script Path to script + * @param deployType type of deployment + * @param language programming language + */ + public static void addScriptToLoad(Configuration conf, + String script, DeployType deployType, Language language) { + addScriptToLoad(conf, new DeployedScript(script, deployType, language)); + } + + /** + * Add a script to load on workers + * + * @param conf {@link Configuration} + * @param script the script to load + */ + public static void addScriptToLoad(Configuration conf, + DeployedScript script) { + List<DeployedScript> scriptsToLoad = getScriptsToLoad(conf); + if (scriptsToLoad == null) { + scriptsToLoad = Lists.<DeployedScript>newArrayList(); + } + scriptsToLoad.add(script); + SCRIPTS_TO_LOAD.set(conf, scriptsToLoad); + } + + /** + * Get the list of scripts to load on workers + * + * @param conf {@link Configuration} + * @return list of {@link DeployedScript}s + */ + public static List<DeployedScript> getScriptsToLoad(Configuration conf) { + TypeReference<List<DeployedScript>> jsonType = + new TypeReference<List<DeployedScript>>() { }; + return SCRIPTS_TO_LOAD.get(conf, jsonType); + } + + /** + * Load all the scripts deployed in Configuration + * + * @param conf Configuration + */ + public static void loadScripts(Configuration conf) { + List<DeployedScript> deployedScripts = getScriptsToLoad(conf); + if (deployedScripts == null) { + return; + } + for (DeployedScript deployedScript : deployedScripts) { + loadScript(conf, deployedScript); + } + } + + /** + * Load a single deployed script + * + * @param conf Configuration + * @param deployedScript the deployed script + */ + public static void loadScript(Configuration conf, + DeployedScript deployedScript) { + InputStream stream = openScriptInputStream(conf, deployedScript); + switch (deployedScript.getLanguage()) { + case JYTHON: + loadJythonScript(stream); + break; + default: + LOG.error("Don't know how to load script " + deployedScript); + throw new IllegalStateException("Don't know how to load script " + + deployedScript); + } + + LOADED_SCRIPTS.add(deployedScript); + Closeables.closeQuietly(stream); + } + + /** + * Load a Jython deployed script + * + * @param stream InputStream with Jython code to load + */ + private static void loadJythonScript(InputStream stream) { + JythonUtils.getInterpreter().execfile(stream); + } + + /** + * Get list of scripts already loaded. + * + * @return list of loaded scripts + */ + public static List<DeployedScript> getLoadedScripts() { + return LOADED_SCRIPTS; + } + + /** + * Get an {@link java.io.InputStream} for the deployed script. + * + * @param conf Configuration + * @param deployedScript the deployed script + * @return {@link java.io.InputStream} for reading script + */ + private static InputStream openScriptInputStream(Configuration conf, + DeployedScript deployedScript) { + DeployType deployType = deployedScript.getDeployType(); + String path = deployedScript.getPath(); + + InputStream stream; + switch (deployType) { + case RESOURCE: + if (LOG.isInfoEnabled()) { + LOG.info("getScriptStream: Reading script from resource at " + + deployedScript.getPath()); + } + stream = ScriptLoader.class.getClassLoader().getResourceAsStream(path); + if (stream == null) { + throw new IllegalStateException("getScriptStream: Failed to " + + "open script from resource at " + path); + } + break; + case DISTRIBUTED_CACHE: + if (LOG.isInfoEnabled()) { + LOG.info("getScriptStream: Reading script from DistributedCache at " + + path); + } + Optional<Path> localPath = getLocalCacheFile(conf, path); + if (!localPath.isPresent()) { + throw new IllegalStateException("getScriptStream: Failed to " + + "find script in local DistributedCache matching " + path); + } + String pathStr = localPath.get().toString(); + try { + stream = new BufferedInputStream(new FileInputStream(pathStr)); + } catch (IOException e) { + throw new IllegalStateException("getScriptStream: Failed open " + + "script from DistributedCache at " + localPath); + } + break; + default: + throw new IllegalArgumentException("getScriptStream: Unknown " + + "script deployment type: " + deployType); + } + return stream; + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/efe6bf3d/giraph-core/src/main/java/org/apache/giraph/scripting/package-info.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/scripting/package-info.java b/giraph-core/src/main/java/org/apache/giraph/scripting/package-info.java new file mode 100644 index 0000000..fbe077c --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/scripting/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Scripting with Giraph. + */ +package org.apache.giraph.scripting; http://git-wip-us.apache.org/repos/asf/giraph/blob/efe6bf3d/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java index 6b89403..0424ff5 100644 --- a/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java +++ b/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationUtils.java @@ -33,11 +33,14 @@ import org.apache.giraph.conf.TypesHolder; import org.apache.giraph.edge.OutEdges; import org.apache.giraph.graph.Computation; import org.apache.giraph.factories.VertexValueFactory; +import org.apache.giraph.graph.Language; import org.apache.giraph.io.EdgeInputFormat; import org.apache.giraph.io.VertexInputFormat; import org.apache.giraph.io.VertexOutputFormat; import org.apache.giraph.io.formats.GiraphFileInputFormat; import org.apache.giraph.job.GiraphConfigurationValidator; +import org.apache.giraph.scripting.DeployType; +import org.apache.giraph.scripting.ScriptLoader; import org.apache.giraph.jython.JythonUtils; import org.apache.giraph.master.MasterCompute; import org.apache.giraph.partition.Partition; @@ -433,8 +436,11 @@ public final class ConfigurationUtils { Path path = new Path(scriptPath); Path remotePath = DistributedCacheUtils.copyAndAdd(path, conf); + ScriptLoader.setScriptsToLoad(conf, remotePath.toString(), + DeployType.DISTRIBUTED_CACHE, Language.JYTHON); + GiraphTypes.readFrom(conf).writeIfUnset(conf); - JythonUtils.init(conf, remotePath.toString(), jythonClass); + JythonUtils.init(conf, jythonClass); } /** http://git-wip-us.apache.org/repos/asf/giraph/blob/efe6bf3d/giraph-core/src/test/java/org/apache/giraph/jython/TestJython.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/test/java/org/apache/giraph/jython/TestJython.java b/giraph-core/src/test/java/org/apache/giraph/jython/TestJython.java index 58f25a6..b5110bb 100644 --- a/giraph-core/src/test/java/org/apache/giraph/jython/TestJython.java +++ b/giraph-core/src/test/java/org/apache/giraph/jython/TestJython.java @@ -20,12 +20,16 @@ package org.apache.giraph.jython; import org.apache.giraph.conf.GiraphConfiguration; import org.apache.giraph.conf.GiraphTypes; import org.apache.giraph.edge.ByteArrayEdges; +import org.apache.giraph.graph.Language; import org.apache.giraph.io.formats.IdWithValueTextOutputFormat; import org.apache.giraph.io.formats.IntNullTextEdgeInputFormat; +import org.apache.giraph.scripting.DeployType; +import org.apache.giraph.scripting.ScriptLoader; import org.apache.giraph.utils.InternalVertexRunner; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.NullWritable; import org.junit.Test; +import org.python.core.PyClass; import org.python.core.PyDictionary; import org.python.core.PyInteger; import org.python.core.PyList; @@ -36,7 +40,6 @@ import com.google.common.collect.Maps; import java.util.Map; -import static org.apache.giraph.jython.JythonComputationFactory.JYTHON_DEPLOY_TYPE; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -66,6 +69,8 @@ public class TestJython { interpreter.exec(jython); PyObject fooClass = interpreter.get("Foo"); + assertTrue(fooClass instanceof PyClass); + PyObject getMapFunc = interpreter.get("get_map"); PyObject getListFunc = interpreter.get("get_list"); PyObject getIValFunc = interpreter.get("get_ival"); @@ -109,8 +114,10 @@ public class TestJython { GiraphTypes types = new GiraphTypes(IntWritable.class, IntWritable.class, NullWritable.class, NullWritable.class, NullWritable.class); types.writeIfUnset(conf); - JythonUtils.init(conf, "count-edges.py", "CountEdges"); - JYTHON_DEPLOY_TYPE.set(conf, DeployType.RESOURCE); + ScriptLoader.setScriptsToLoad(conf, + "org/apache/giraph/jython/count-edges.py", + DeployType.RESOURCE, Language.JYTHON); + JythonUtils.init(conf, "CountEdges"); conf.setOutEdgesClass(ByteArrayEdges.class); conf.setEdgeInputFormatClass(IntNullTextEdgeInputFormat.class); conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
