Updated Branches: refs/heads/trunk e1a7f2905 -> 8c42e3f9b
GIRAPH-431: Support edge and vertex value input formats in GiraphRunner (apresta) Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/8c42e3f9 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/8c42e3f9 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/8c42e3f9 Branch: refs/heads/trunk Commit: 8c42e3f9b6844668c029778bcd97603459ec98fc Parents: e1a7f29 Author: Alessandro Presta <[email protected]> Authored: Wed Jan 16 15:43:17 2013 -0800 Committer: Alessandro Presta <[email protected]> Committed: Wed Jan 16 16:05:05 2013 -0800 ---------------------------------------------------------------------- CHANGELOG | 4 +- .../main/java/org/apache/giraph/GiraphRunner.java | 115 +++++++++------ .../apache/giraph/graph/GiraphTypeValidator.java | 85 ++++++++--- 3 files changed, 131 insertions(+), 73 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/8c42e3f9/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index 6c2c5e6..5fbda9a 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,7 +1,9 @@ Giraph Change Log Release 0.2.0 - unreleased - GIRAPH-Fetching locality info in InputSplitPathOrganizer causes jobs to hang (apresta via ereisman) + GIRAPH-431: Support edge and vertex value input formats in GiraphRunner (apresta) + + GIRAPH-477: Fetching locality info in InputSplitPathOrganizer causes jobs to hang (apresta via ereisman) GIRAPH-459: Group Vertex Mutations by Partition ID (claudio) http://git-wip-us.apache.org/repos/asf/giraph/blob/8c42e3f9/giraph-core/src/main/java/org/apache/giraph/GiraphRunner.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/GiraphRunner.java b/giraph-core/src/main/java/org/apache/giraph/GiraphRunner.java index 1edb262..3d0e634 100644 --- a/giraph-core/src/main/java/org/apache/giraph/GiraphRunner.java +++ b/giraph-core/src/main/java/org/apache/giraph/GiraphRunner.java @@ -17,27 +17,30 @@ */ package org.apache.giraph; +import com.google.common.base.Splitter; +import com.google.common.collect.Iterables; import org.apache.commons.cli.BasicParser; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.HelpFormatter; import org.apache.commons.cli.Options; -import org.apache.giraph.conf.GiraphConfiguration; -import org.apache.giraph.examples.Algorithm; import org.apache.giraph.aggregators.AggregatorWriter; import org.apache.giraph.combiner.Combiner; +import org.apache.giraph.conf.GiraphConfiguration; +import org.apache.giraph.examples.Algorithm; import org.apache.giraph.graph.GiraphJob; import org.apache.giraph.graph.GiraphTypeValidator; -import org.apache.giraph.master.MasterCompute; -import org.apache.giraph.vertex.Vertex; +import org.apache.giraph.io.EdgeInputFormat; import org.apache.giraph.io.VertexInputFormat; import org.apache.giraph.io.VertexOutputFormat; -import org.apache.giraph.worker.WorkerContext; +import org.apache.giraph.io.formats.GiraphFileInputFormat; +import org.apache.giraph.master.MasterCompute; import org.apache.giraph.utils.AnnotationUtils; +import org.apache.giraph.vertex.Vertex; +import org.apache.giraph.worker.WorkerContext; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; @@ -45,9 +48,6 @@ import org.apache.log4j.Level; import org.apache.log4j.Logger; import org.apache.zookeeper.ZooKeeper; -import com.google.common.base.Splitter; -import com.google.common.collect.Iterables; - import java.net.URI; import java.util.List; @@ -66,13 +66,6 @@ public class GiraphRunner implements Tool { private Configuration conf; /** - * Required options. - */ - private final String [][] requiredOptions = - {{"w", "Need to choose the number of workers (-w)"}, - {"if", "Need to set inputformat (-if)"}}; - - /** * Get the options available. * * @return Options available. @@ -84,10 +77,12 @@ public class GiraphRunner implements Tool { "algorithms"); options.addOption("q", "quiet", false, "Quiet output"); options.addOption("w", "workers", true, "Number of workers"); - options.addOption("if", "inputFormat", true, "Graph inputformat"); - options.addOption("of", "outputFormat", true, "Graph outputformat"); - options.addOption("ip", "inputPath", true, "Graph input path"); - options.addOption("op", "outputPath", true, "Graph output path"); + options.addOption("vif", "vertexInputFormat", true, "Vertex input format"); + options.addOption("eif", "edgeInputFormat", true, "Edge input format"); + options.addOption("of", "outputFormat", true, "Vertex output format"); + options.addOption("vip", "vertexInputPath", true, "Vertex input path"); + options.addOption("eip", "edgeInputPath", true, "Edge input path"); + options.addOption("op", "outputPath", true, "Vertex output path"); options.addOption("c", "combiner", true, "Combiner class"); options.addOption("wc", "workerContext", true, "WorkerContext class"); options.addOption("aw", "aggregatorWriter", true, "AggregatorWriter class"); @@ -157,47 +152,73 @@ public class GiraphRunner implements Tool { LOG.debug("Attempting to run Vertex: " + vertexClassName); } - // Verify all the options have been provided - for (String[] requiredOption : requiredOptions) { - if (!cmd.hasOption(requiredOption[0])) { - if (LOG.isInfoEnabled()) { - LOG.info(requiredOption[1]); - } - return -1; + // Verify all the required options have been provided + if (!cmd.hasOption("w")) { + if (LOG.isInfoEnabled()) { + LOG.info("Need to choose the number of workers (-w)"); + } + return -1; + } + if (!cmd.hasOption("vif") && !cmd.hasOption("eif")) { + if (LOG.isInfoEnabled()) { + LOG.info("Need to set an input format (-vif or -eif)"); } + return -1; } int workers = Integer.parseInt(cmd.getOptionValue('w')); + GiraphConfiguration giraphConfiguration = new GiraphConfiguration( getConf()); + giraphConfiguration.setVertexClass( (Class<? extends Vertex>) Class.forName(vertexClassName)); - giraphConfiguration.setVertexInputFormatClass( - (Class<? extends VertexInputFormat>) - Class.forName(cmd.getOptionValue("if"))); - giraphConfiguration.setVertexOutputFormatClass( - (Class<? extends VertexOutputFormat>) - Class.forName(cmd.getOptionValue("of"))); + GiraphJob job = new GiraphJob( giraphConfiguration, "Giraph: " + vertexClassName); - if (cmd.hasOption("ip")) { - FileInputFormat.addInputPath(job.getInternalJob(), - new Path(cmd.getOptionValue("ip"))); - } else { - if (LOG.isInfoEnabled()) { - LOG.info("No input path specified. Ensure your InputFormat does" + - " not require one."); + if (cmd.hasOption("vif")) { + giraphConfiguration.setVertexInputFormatClass( + (Class<? extends VertexInputFormat>) + Class.forName(cmd.getOptionValue("vif"))); + if (cmd.hasOption("vip")) { + GiraphFileInputFormat.addVertexInputPath(job.getInternalJob(), + new Path(cmd.getOptionValue("eip"))); + } else { + if (LOG.isInfoEnabled()) { + LOG.info("No vertex input path specified. Ensure your " + + "VertexInputFormat does not require one."); + } } } - if (cmd.hasOption("op")) { - FileOutputFormat.setOutputPath(job.getInternalJob(), - new Path(cmd.getOptionValue("op"))); - } else { - if (LOG.isInfoEnabled()) { - LOG.info("No output path specified. Ensure your OutputFormat does" + - " not require one."); + if (cmd.hasOption("eif")) { + giraphConfiguration.setEdgeInputFormatClass( + (Class<? extends EdgeInputFormat>) + Class.forName(cmd.getOptionValue("eif"))); + if (cmd.hasOption("eip")) { + GiraphFileInputFormat.addEdgeInputPath(job.getInternalJob(), + new Path(cmd.getOptionValue("eip"))); + } else { + if (LOG.isInfoEnabled()) { + LOG.info("No edge input path specified. Ensure your " + + "VertexInputFormat does not require one."); + } + } + } + + if (cmd.hasOption("of")) { + giraphConfiguration.setVertexOutputFormatClass( + (Class<? extends VertexOutputFormat>) + Class.forName(cmd.getOptionValue("of"))); + if (cmd.hasOption("op")) { + FileOutputFormat.setOutputPath(job.getInternalJob(), + new Path(cmd.getOptionValue("op"))); + } else { + if (LOG.isInfoEnabled()) { + LOG.info("No output path specified. Ensure your VertexOutputFormat " + + "does not require one."); + } } } http://git-wip-us.apache.org/repos/asf/giraph/blob/8c42e3f9/giraph-core/src/main/java/org/apache/giraph/graph/GiraphTypeValidator.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/GiraphTypeValidator.java b/giraph-core/src/main/java/org/apache/giraph/graph/GiraphTypeValidator.java index c179a4b..213a69a 100644 --- a/giraph-core/src/main/java/org/apache/giraph/graph/GiraphTypeValidator.java +++ b/giraph-core/src/main/java/org/apache/giraph/graph/GiraphTypeValidator.java @@ -20,6 +20,7 @@ package org.apache.giraph.graph; import org.apache.giraph.bsp.BspUtils; import org.apache.giraph.combiner.Combiner; +import org.apache.giraph.io.EdgeInputFormat; import org.apache.giraph.io.VertexInputFormat; import org.apache.giraph.io.VertexOutputFormat; import org.apache.giraph.utils.ReflectionUtils; @@ -60,6 +61,8 @@ public class GiraphTypeValidator<I extends WritableComparable, private static final int MSG_PARAM_INDEX = 3; /** M param vertex combiner index in classList */ private static final int MSG_COMBINER_PARAM_INDEX = 1; + /** E param edge input format index in classList */ + private static final int EDGE_PARAM_EDGE_INPUT_FORMAT_INDEX = 1; /** Vertex Index Type */ private Type vertexIndexType; @@ -101,6 +104,7 @@ public class GiraphTypeValidator<I extends WritableComparable, edgeValueType = classList.get(EDGE_PARAM_INDEX); messageValueType = classList.get(MSG_PARAM_INDEX); verifyVertexInputFormatGenericTypes(); + verifyEdgeInputFormatGenericTypes(); verifyVertexOutputFormatGenericTypes(); verifyVertexResolverGenericTypes(); verifyVertexCombinerGenericTypes(); @@ -110,32 +114,63 @@ public class GiraphTypeValidator<I extends WritableComparable, private void verifyVertexInputFormatGenericTypes() { Class<? extends VertexInputFormat<I, V, E, M>> vertexInputFormatClass = BspUtils.<I, V, E, M>getVertexInputFormatClass(conf); - List<Class<?>> classList = - ReflectionUtils.getTypeArguments( - VertexInputFormat.class, vertexInputFormatClass); - if (classList.get(ID_PARAM_INDEX) == null) { - LOG.warn("Input format vertex index type is not known"); - } else if (!vertexIndexType.equals(classList.get(ID_PARAM_INDEX))) { - throw new IllegalArgumentException( - "checkClassTypes: Vertex index types don't match, " + - "vertex - " + vertexIndexType + - ", vertex input format - " + classList.get(ID_PARAM_INDEX)); - } - if (classList.get(VALUE_PARAM_INDEX) == null) { - LOG.warn("Input format vertex value type is not known"); - } else if (!vertexValueType.equals(classList.get(VALUE_PARAM_INDEX))) { - throw new IllegalArgumentException( - "checkClassTypes: Vertex value types don't match, " + - "vertex - " + vertexValueType + - ", vertex input format - " + classList.get(VALUE_PARAM_INDEX)); + if (vertexInputFormatClass != null) { + List<Class<?>> classList = + ReflectionUtils.getTypeArguments( + VertexInputFormat.class, vertexInputFormatClass); + if (classList.get(ID_PARAM_INDEX) == null) { + LOG.warn("Input format vertex index type is not known"); + } else if (!vertexIndexType.equals(classList.get(ID_PARAM_INDEX))) { + throw new IllegalArgumentException( + "checkClassTypes: Vertex index types don't match, " + + "vertex - " + vertexIndexType + + ", vertex input format - " + classList.get(ID_PARAM_INDEX)); + } + if (classList.get(VALUE_PARAM_INDEX) == null) { + LOG.warn("Input format vertex value type is not known"); + } else if (!vertexValueType.equals(classList.get(VALUE_PARAM_INDEX))) { + throw new IllegalArgumentException( + "checkClassTypes: Vertex value types don't match, " + + "vertex - " + vertexValueType + + ", vertex input format - " + classList.get(VALUE_PARAM_INDEX)); + } + if (classList.get(EDGE_PARAM_INDEX) == null) { + LOG.warn("Input format edge value type is not known"); + } else if (!edgeValueType.equals(classList.get(EDGE_PARAM_INDEX))) { + throw new IllegalArgumentException( + "checkClassTypes: Edge value types don't match, " + + "vertex - " + edgeValueType + + ", vertex input format - " + classList.get(EDGE_PARAM_INDEX)); + } } - if (classList.get(EDGE_PARAM_INDEX) == null) { - LOG.warn("Input format edge value type is not known"); - } else if (!edgeValueType.equals(classList.get(EDGE_PARAM_INDEX))) { - throw new IllegalArgumentException( - "checkClassTypes: Edge value types don't match, " + - "vertex - " + edgeValueType + - ", vertex input format - " + classList.get(EDGE_PARAM_INDEX)); + } + + /** Verify matching generic types in EdgeInputFormat. */ + private void verifyEdgeInputFormatGenericTypes() { + Class<? extends EdgeInputFormat<I, E>> edgeInputFormatClass = + BspUtils.<I, E>getEdgeInputFormatClass(conf); + if (edgeInputFormatClass != null) { + List<Class<?>> classList = + ReflectionUtils.getTypeArguments( + EdgeInputFormat.class, edgeInputFormatClass); + if (classList.get(ID_PARAM_INDEX) == null) { + LOG.warn("Input format vertex index type is not known"); + } else if (!vertexIndexType.equals(classList.get(ID_PARAM_INDEX))) { + throw new IllegalArgumentException( + "checkClassTypes: Vertex index types don't match, " + + "vertex - " + vertexIndexType + + ", edge input format - " + classList.get(ID_PARAM_INDEX)); + } + if (classList.get(EDGE_PARAM_EDGE_INPUT_FORMAT_INDEX) == null) { + LOG.warn("Input format edge value type is not known"); + } else if (!edgeValueType.equals( + classList.get(EDGE_PARAM_EDGE_INPUT_FORMAT_INDEX))) { + throw new IllegalArgumentException( + "checkClassTypes: Edge value types don't match, " + + "vertex - " + edgeValueType + + ", edge input format - " + + classList.get(EDGE_PARAM_EDGE_INPUT_FORMAT_INDEX)); + } } }
