Updated Branches: refs/heads/trunk a68f2bada -> 9c47b670a
GIRAPH-577: Create a testing framework that doesn't require I/O formats (ves via apresta) Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/9c47b670 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/9c47b670 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/9c47b670 Branch: refs/heads/trunk Commit: 9c47b670a1972cb64e3a1d92ad376e6f0db10b1f Parents: a68f2ba Author: Alessandro Presta <[email protected]> Authored: Fri Mar 29 11:26:01 2013 -0700 Committer: Alessandro Presta <[email protected]> Committed: Fri Mar 29 11:26:01 2013 -0700 ---------------------------------------------------------------------- CHANGELOG | 2 + .../apache/giraph/utils/InternalVertexRunner.java | 111 +++++++++++++++ 2 files changed, 113 insertions(+), 0 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/9c47b670/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index 4e2036f..ee54f00 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,8 @@ Giraph Change Log Release 0.2.0 - unreleased + GIRAPH-577: Create a testing framework that doesn't require I/O formats (ves via apresta) + GIRAPH-593: Update Hive IO performance improvements (nitay) GIRAPH-594: auto set reusing objects (nitay) http://git-wip-us.apache.org/repos/asf/giraph/blob/9c47b670/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java b/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java index 4b03127..e389e01 100644 --- a/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java +++ b/giraph-core/src/main/java/org/apache/giraph/utils/InternalVertexRunner.java @@ -24,6 +24,8 @@ import org.apache.giraph.conf.GiraphConstants; import org.apache.giraph.job.GiraphJob; import org.apache.giraph.io.formats.GiraphFileInputFormat; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.zookeeper.server.ServerConfig; @@ -218,6 +220,115 @@ public class InternalVertexRunner { } /** + * Attempts to run the vertex internally in the current JVM, reading and + * writing to an in-memory graph. Will start its own zookeeper + * instance. + * @param <I> The vertex index type + * @param <V> The vertex type + * @param <E> The edge type + * @param <M> The message type + * @param classes GiraphClasses specifying which types to use + * @param params a map of parameters to add to the hadoop configuration + * @param graph input graph + * @return iterable output data + * @throws Exception if anything goes wrong + */ + public static <I extends WritableComparable, + V extends Writable, + E extends Writable, + M extends Writable> TestGraph<I, V, E, M> run( + GiraphClasses<I, V, E, M> classes, + Map<String, String> params, + TestGraph<I, V, E, M> graph) throws Exception { + File tmpDir = null; + try { + // Prepare temporary folders + tmpDir = FileUtils.createTestDir(classes.getVertexClass()); + + File zkDir = FileUtils.createTempDir(tmpDir, "_bspZooKeeper"); + File zkMgrDir = FileUtils.createTempDir(tmpDir, "_defaultZkManagerDir"); + File checkpointsDir = FileUtils.createTempDir(tmpDir, "_checkpoints"); + + // Create and configure the job to run the vertex + GiraphJob job = new GiraphJob(classes.getVertexClass().getName()); + + InMemoryVertexInputFormat.setGraph(graph); + + GiraphConfiguration conf = job.getConfiguration(); + conf.setVertexClass(classes.getVertexClass()); + conf.setVertexEdgesClass(classes.getVertexEdgesClass()); + conf.setVertexInputFormatClass(InMemoryVertexInputFormat.class); + conf.setInputVertexEdgesClass(classes.getInputVertexEdgesClass()); + conf.setVertexValueFactoryClass(classes.getVertexValueFactoryClass()); + if (classes.hasWorkerContextClass()) { + conf.setWorkerContextClass(classes.getWorkerContextClass()); + } + if (classes.hasCombinerClass()) { + conf.setVertexCombinerClass(classes.getCombinerClass()); + } + if (classes.hasMasterComputeClass()) { + conf.setMasterComputeClass(classes.getMasterComputeClass()); + } + if (classes.hasVertexInputFormat()) { + conf.setVertexInputFormatClass(classes.getVertexInputFormatClass()); + } + if (classes.hasEdgeInputFormat()) { + conf.setEdgeInputFormatClass(classes.getEdgeInputFormatClass()); + } + if (classes.hasVertexOutputFormat()) { + conf.setVertexOutputFormatClass(classes.getVertexOutputFormatClass()); + } + + conf.setWorkerConfiguration(1, 1, 100.0f); + GiraphConstants.SPLIT_MASTER_WORKER.set(conf, false); + GiraphConstants.LOCAL_TEST_MODE.set(conf, true); + conf.set(GiraphConstants.ZOOKEEPER_LIST, "localhost:" + + String.valueOf(LOCAL_ZOOKEEPER_PORT)); + + conf.set(GiraphConstants.ZOOKEEPER_DIR, zkDir.toString()); + GiraphConstants.ZOOKEEPER_MANAGER_DIRECTORY.set(conf, + zkMgrDir.toString()); + GiraphConstants.CHECKPOINT_DIRECTORY.set(conf, checkpointsDir.toString()); + + for (Map.Entry<String, String> param : params.entrySet()) { + conf.set(param.getKey(), param.getValue()); + } + + // Configure a local zookeeper instance + Properties zkProperties = configLocalZooKeeper(zkDir); + + QuorumPeerConfig qpConfig = new QuorumPeerConfig(); + qpConfig.parseProperties(zkProperties); + + // Create and run the zookeeper instance + final InternalZooKeeper zookeeper = new InternalZooKeeper(); + final ServerConfig zkConfig = new ServerConfig(); + zkConfig.readFrom(qpConfig); + + ExecutorService executorService = Executors.newSingleThreadExecutor(); + executorService.execute(new Runnable() { + @Override + public void run() { + try { + zookeeper.runFromConfig(zkConfig); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + }); + try { + job.run(true); + } finally { + executorService.shutdown(); + zookeeper.end(); + } + return graph; + } finally { + FileUtils.delete(tmpDir); + } + } + + /** * Configuration options for running local ZK. * * @param zkDir directory for ZK to hold files in.
