Updated Branches: refs/heads/trunk ff970f2db -> c5a87d161
http://git-wip-us.apache.org/repos/asf/giraph/blob/c5a87d16/giraph-hive/src/main/java/org/apache/giraph/hive/HiveGiraphRunner.java ---------------------------------------------------------------------- diff --git a/giraph-hive/src/main/java/org/apache/giraph/hive/HiveGiraphRunner.java b/giraph-hive/src/main/java/org/apache/giraph/hive/HiveGiraphRunner.java index 8d67c1d..98be881 100644 --- a/giraph-hive/src/main/java/org/apache/giraph/hive/HiveGiraphRunner.java +++ b/giraph-hive/src/main/java/org/apache/giraph/hive/HiveGiraphRunner.java @@ -26,6 +26,7 @@ import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; import org.apache.giraph.conf.GiraphClasses; import org.apache.giraph.conf.GiraphConfiguration; +import org.apache.giraph.conf.GiraphConstants; import org.apache.giraph.graph.Vertex; import org.apache.giraph.hive.input.edge.HiveEdgeInputFormat; import org.apache.giraph.hive.input.edge.HiveToEdge; @@ -33,14 +34,20 @@ import org.apache.giraph.hive.input.vertex.HiveToVertex; import org.apache.giraph.hive.input.vertex.HiveVertexInputFormat; import org.apache.giraph.hive.output.HiveVertexOutputFormat; import org.apache.giraph.hive.output.VertexToHive; +import org.apache.giraph.io.formats.multi.EdgeInputFormatDescription; +import org.apache.giraph.io.formats.multi.InputFormatDescription; +import org.apache.giraph.io.formats.multi.MultiEdgeInputFormat; +import org.apache.giraph.io.formats.multi.MultiVertexInputFormat; +import org.apache.giraph.io.formats.multi.VertexInputFormatDescription; import org.apache.giraph.job.GiraphJob; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.log4j.Logger; -import org.apache.thrift.TException; +import com.google.common.base.Splitter; +import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import java.io.File; @@ -81,11 +88,13 @@ public class HiveGiraphRunner implements Tool { /** vertex class. */ private Class<? extends Vertex> vertexClass; - /** Vertex creator from hive records. */ - private Class<? extends HiveToVertex> hiveToVertexClass; + /** Descriptions of vertex input formats */ + private List<VertexInputFormatDescription> vertexInputDescriptions = + Lists.newArrayList(); - /** Edge creator from hive records. */ - private Class<? extends HiveToEdge> hiveToEdgeClass; + /** Descriptions of edge input formats */ + private List<EdgeInputFormatDescription> edgeInputDescriptions = + Lists.newArrayList(); /** Hive Vertex writer */ private Class<? extends VertexToHive> vertexToHiveClass; @@ -108,51 +117,100 @@ public class HiveGiraphRunner implements Tool { this.vertexClass = vertexClass; } - public Class<? extends HiveToVertex> getHiveToVertexClass() { - return hiveToVertexClass; + public List<VertexInputFormatDescription> getVertexInputDescriptions() { + return vertexInputDescriptions; } /** - * Set HiveToVertex used with HiveVertexInputFormat + * Whether to use vertex input. * - * @param hiveToVertexClass HiveToVertex + * @return true if vertex input enabled (at least one HiveToVertex is set). */ - public void setHiveToVertexClass( - Class<? extends HiveToVertex> hiveToVertexClass) { - this.hiveToVertexClass = hiveToVertexClass; - HIVE_TO_VERTEX_CLASS.set(conf, hiveToVertexClass); + public boolean hasVertexInput() { + return !vertexInputDescriptions.isEmpty(); } /** - * Whether to use vertex input. + * Add vertex input * - * @return true if vertex input enabled (HiveToVertex is set). + * @param hiveToVertexClass HiveToVertex class to use + * @param tableName Table name + * @param partitionFilter Partition filter, or null if no filter used + * @param additionalOptions Additional options, in the form "option=value" */ - public boolean hasVertexValueInput() { - return hiveToVertexClass != null; + public void addVertexInput(Class<? extends HiveToVertex> hiveToVertexClass, + String tableName, String partitionFilter, String ... additionalOptions) { + VertexInputFormatDescription description = + new VertexInputFormatDescription(HiveVertexInputFormat.class); + description.addParameter( + HIVE_TO_VERTEX_CLASS.getKey(), hiveToVertexClass.getName()); + description.addParameter(HIVE_VERTEX_INPUT_PROFILE_ID.getKey(), + "vertex_input_profile_" + vertexInputDescriptions.size()); + description.addParameter( + HIVE_VERTEX_INPUT_TABLE.getKey(), tableName); + if (partitionFilter != null && !partitionFilter.isEmpty()) { + description.addParameter( + HIVE_VERTEX_INPUT_PARTITION.getKey(), partitionFilter); + } + addAdditionalOptions(description, additionalOptions); + vertexInputDescriptions.add(description); } - public Class<? extends HiveToEdge> getHiveToEdgeClass() { - return hiveToEdgeClass; + public List<EdgeInputFormatDescription> getEdgeInputDescriptions() { + return edgeInputDescriptions; } /** * Whether to use edge input. * - * @return true if edge input enabled (HiveToEdge is set). + * @return true if edge input enabled (at least one HiveToEdge is set). */ public boolean hasEdgeInput() { - return hiveToEdgeClass != null; + return !edgeInputDescriptions.isEmpty(); + } + + /** + * Add edge input + * + * @param hiveToEdgeClass HiveToEdge class to use + * @param tableName Table name + * @param partitionFilter Partition filter, or null if no filter used + * @param additionalOptions Additional options, in the form "option=value" + */ + public void addEdgeInput(Class<? extends HiveToEdge> hiveToEdgeClass, + String tableName, String partitionFilter, String ... additionalOptions) { + EdgeInputFormatDescription description = + new EdgeInputFormatDescription(HiveEdgeInputFormat.class); + description.addParameter( + HIVE_TO_EDGE_CLASS.getKey(), hiveToEdgeClass.getName()); + description.addParameter(HIVE_EDGE_INPUT_PROFILE_ID.getKey(), + "edge_input_profile_" + edgeInputDescriptions.size()); + description.addParameter( + HIVE_EDGE_INPUT_TABLE.getKey(), tableName); + if (partitionFilter != null && !partitionFilter.isEmpty()) { + description.addParameter( + HIVE_EDGE_INPUT_PARTITION.getKey(), partitionFilter); + } + addAdditionalOptions(description, additionalOptions); + edgeInputDescriptions.add(description); } /** - * Set HiveToEdge used with HiveEdgeInputFormat + * Add additional options to InputFormatDescription * - * @param hiveToEdgeClass HiveToEdge + * @param description InputFormatDescription + * @param additionalOptions Additional options */ - public void setHiveToEdgeClass(Class<? extends HiveToEdge> hiveToEdgeClass) { - this.hiveToEdgeClass = hiveToEdgeClass; - HIVE_TO_EDGE_CLASS.set(conf, hiveToEdgeClass); + private static void addAdditionalOptions(InputFormatDescription description, + String ... additionalOptions) { + for (String additionalOption : additionalOptions) { + String[] nameValue = split(additionalOption, "="); + if (nameValue.length != 2) { + throw new IllegalStateException("Invalid additional option format " + + additionalOption + ", 'name=value' format expected"); + } + description.addParameter(nameValue[0], nameValue[1]); + } } public Class<? extends VertexToHive> getVertexToHiveClass() { @@ -169,14 +227,22 @@ public class HiveGiraphRunner implements Tool { } /** - * Set class used to write vertices to Hive. + * Set vertex output * * @param vertexToHiveClass class for writing vertices to Hive. + * @param tableName Table name + * @param partitionFilter Partition filter, or null if no filter used */ - public void setVertexToHiveClass( - Class<? extends VertexToHive> vertexToHiveClass) { + public void setVertexOutput( + Class<? extends VertexToHive> vertexToHiveClass, String tableName, + String partitionFilter) { this.vertexToHiveClass = vertexToHiveClass; VERTEX_TO_HIVE_CLASS.set(conf, vertexToHiveClass); + HIVE_VERTEX_OUTPUT_PROFILE_ID.set(conf, "vertex_output_profile"); + HIVE_VERTEX_OUTPUT_TABLE.set(conf, tableName); + if (partitionFilter != null) { + HIVE_VERTEX_OUTPUT_PARTITION.set(conf, partitionFilter); + } } /** @@ -209,9 +275,6 @@ public class HiveGiraphRunner implements Tool { GiraphConfiguration giraphConf = job.getConfiguration(); giraphConf.setVertexClass(vertexClass); - setupHiveInputs(giraphConf); - setupHiveOutput(giraphConf); - giraphConf.setWorkerConfiguration(workers, workers, 100.0f); initGiraphJob(job); @@ -221,42 +284,48 @@ public class HiveGiraphRunner implements Tool { } /** - * Initialize hive input settings - * - * @param conf Configuration to write to - * @throws TException thrift problem + * Prepare vertex input settings in Configuration */ - private void setupHiveInputs(GiraphConfiguration conf) throws TException { - if (hiveToVertexClass != null) { - conf.setVertexInputFormatClass(HiveVertexInputFormat.class); - HIVE_VERTEX_INPUT_PROFILE_ID.set(conf, "vertex_input_profile"); - } - - if (hiveToEdgeClass != null) { - conf.setEdgeInputFormatClass(HiveEdgeInputFormat.class); - HIVE_EDGE_INPUT_PROFILE_ID.set(conf, "edge_input_profile"); + @SuppressWarnings("unchecked") + public void prepareHiveVertexInputs() { + if (vertexInputDescriptions.size() == 1) { + GiraphConstants.VERTEX_INPUT_FORMAT_CLASS.set(conf, + vertexInputDescriptions.get(0).getInputFormatClass()); + vertexInputDescriptions.get(0).putParametersToConfiguration(conf); + } else { + GiraphConstants.VERTEX_INPUT_FORMAT_CLASS.set(conf, + MultiVertexInputFormat.class); + VertexInputFormatDescription.VERTEX_INPUT_FORMAT_DESCRIPTIONS.set(conf, + InputFormatDescription.toJsonString(vertexInputDescriptions)); } } /** - * Initialize hive output settings - * - * @param conf Configuration to write to - * @throws TException thrift problem + * Prepare edge input settings in Configuration */ - private void setupHiveOutput(GiraphConfiguration conf) throws TException { - if (skipOutput) { - LOG.warn("run: Warning - Output will be skipped!"); - } else if (vertexToHiveClass != null) { - conf.setVertexOutputFormatClass(HiveVertexOutputFormat.class); - HIVE_VERTEX_OUTPUT_PROFILE_ID.set(conf, "vertex_output_profile"); + @SuppressWarnings("unchecked") + public void prepareHiveEdgeInputs() { + if (edgeInputDescriptions.size() == 1) { + GiraphConstants.EDGE_INPUT_FORMAT_CLASS.set(conf, + edgeInputDescriptions.get(0).getInputFormatClass()); + edgeInputDescriptions.get(0).putParametersToConfiguration(conf); } else { - LOG.fatal("output requested but " + VertexToHive.class.getSimpleName() + - " not set"); + GiraphConstants.EDGE_INPUT_FORMAT_CLASS.set(conf, + MultiEdgeInputFormat.class); + EdgeInputFormatDescription.EDGE_INPUT_FORMAT_DESCRIPTIONS.set(conf, + InputFormatDescription.toJsonString(edgeInputDescriptions)); } } /** + * Prepare output settings in Configuration + */ + public void prepareHiveOutput() { + GiraphConstants.VERTEX_OUTPUT_FORMAT_CLASS.set(conf, + HiveVertexOutputFormat.class); + } + + /** * set hive configuration */ private void adjustConfigurationForHive() { @@ -312,47 +381,65 @@ public class HiveGiraphRunner implements Tool { " class name (-vertexClass) to use"); } - String hiveToVertexClassStr = cmdln.getOptionValue("hiveToVertexClass"); - if (hiveToVertexClassStr != null) { - if (hiveToVertexClassStr.equals("disable")) { - hiveToVertexClass = null; - } else { - setHiveToVertexClass( - findClass(hiveToVertexClassStr, HiveToVertex.class)); + String[] vertexInputs = cmdln.getOptionValues("vertexInput"); + if (vertexInputs != null && vertexInputs.length != 0) { + vertexInputDescriptions.clear(); + for (String vertexInput : vertexInputs) { + String[] parameters = split(vertexInput, ","); + if (parameters.length < 2) { + throw new IllegalStateException("Illegal vertex input description " + + vertexInput + " - HiveToVertex class and table name needed"); + } + addVertexInput(findClass(parameters[0], HiveToVertex.class), + parameters[1], elementOrNull(parameters, 2), + copyOfArray(parameters, 3)); } } - String hiveToEdgeClassStr = cmdln.getOptionValue("hiveToEdgeClass"); - if (hiveToEdgeClassStr != null) { - if (hiveToEdgeClassStr.equals("disable")) { - hiveToEdgeClass = null; - } else { - setHiveToEdgeClass( - findClass(hiveToEdgeClassStr, HiveToEdge.class)); + String[] edgeInputs = cmdln.getOptionValues("edgeInput"); + if (edgeInputs != null && edgeInputs.length != 0) { + edgeInputDescriptions.clear(); + for (String edgeInput : edgeInputs) { + String[] parameters = split(edgeInput, ","); + if (parameters.length < 2) { + throw new IllegalStateException("Illegal edge input description " + + edgeInput + " - HiveToEdge class and table name needed"); + } + addEdgeInput(findClass(parameters[0], HiveToEdge.class), + parameters[1], elementOrNull(parameters, 2), + copyOfArray(parameters, 3)); } } - String vertexToHiveClassStr = cmdln.getOptionValue("vertexToHiveClass"); - if (vertexToHiveClassStr != null) { - setVertexToHiveClass(findClass(vertexToHiveClassStr, VertexToHive.class)); + String output = cmdln.getOptionValue("output"); + if (output != null) { + // Partition filter can contain commas so we limit the number of times + // we split + String[] parameters = split(output, ",", 3); + if (parameters.length < 2) { + throw new IllegalStateException("Illegal output description " + + output + " - VertexToHive class and table name needed"); + } + setVertexOutput(findClass(parameters[0], VertexToHive.class), + parameters[1], elementOrNull(parameters, 2)); } if (cmdln.hasOption("skipOutput")) { skipOutput = true; } - if (hiveToVertexClass == null && hiveToEdgeClass == null) { + if (!hasVertexInput() && !hasEdgeInput()) { throw new IllegalArgumentException( "Need at least one of Giraph " + HiveToVertex.class.getSimpleName() + - " class name (-hiveToVertexClass) and " + + " (-vertexInput) and " + HiveToEdge.class.getSimpleName() + - " class name (-hiveToEdgeClass)"); + " (-edgeInput)"); } if (vertexToHiveClass == null && !skipOutput) { throw new IllegalArgumentException( "Need the Giraph " + VertexToHive.class.getSimpleName() + - " class name (-vertexToHiveClass) to use"); + " (-output) to use"); } String workersStr = cmdln.getOptionValue("workers"); if (workersStr == null) { @@ -360,40 +447,24 @@ public class HiveGiraphRunner implements Tool { "Need to choose the number of workers (-w)"); } - String vertexInputTableStr = cmdln.getOptionValue("vertexInputTable"); - if (vertexInputTableStr == null && hiveToVertexClass != null) { - throw new IllegalArgumentException( - "Need to set the vertex input table name (-vi)"); - } + String dbName = cmdln.getOptionValue("dbName", "default"); - String edgeInputTableStr = cmdln.getOptionValue("edgeInputTable"); - if (edgeInputTableStr == null && hiveToEdgeClass != null) { - throw new IllegalArgumentException( - "Need to set the edge input table name (-ei)"); + if (hasVertexInput()) { + HIVE_VERTEX_INPUT_DATABASE.set(conf, dbName); + prepareHiveVertexInputs(); } - String outputTableStr = cmdln.getOptionValue("outputTable"); - if (outputTableStr == null) { - throw new IllegalArgumentException( - "Need to set the output table name (-o)"); + if (hasEdgeInput()) { + HIVE_EDGE_INPUT_DATABASE.set(conf, dbName); + prepareHiveEdgeInputs(); } - String dbName = cmdln.getOptionValue("dbName", "default"); - - HIVE_VERTEX_INPUT_DATABASE.set(conf, dbName); - HIVE_VERTEX_INPUT_TABLE.set(conf, vertexInputTableStr); - HIVE_VERTEX_INPUT_PARTITION.set(conf, - cmdln.getOptionValue("vertexInputFilter")); - - HIVE_EDGE_INPUT_DATABASE.set(conf, dbName); - HIVE_EDGE_INPUT_TABLE.set(conf, edgeInputTableStr); - HIVE_EDGE_INPUT_PARTITION.set(conf, - cmdln.getOptionValue("edgeInputFilter")); - - HIVE_VERTEX_OUTPUT_DATABASE.set(conf, dbName); - HIVE_VERTEX_OUTPUT_TABLE.set(conf, cmdln.getOptionValue("outputTable")); - HIVE_VERTEX_OUTPUT_PARTITION.set(conf, - cmdln.getOptionValue("outputPartition")); + if (!skipOutput) { + HIVE_VERTEX_OUTPUT_DATABASE.set(conf, dbName); + prepareHiveOutput(); + } else { + LOG.warn("run: Warning - Output will be skipped!"); + } workers = Integer.parseInt(workersStr); @@ -447,42 +518,44 @@ public class HiveGiraphRunner implements Tool { options.addOption("db", "dbName", true, "Hive database name"); // Vertex input settings - options.addOption(null, "hiveToVertexClass", true, - "Giraph " + HiveToVertex.class.getSimpleName() + - " class to use (default - " + - (hiveToVertexClass == null ? "not used" : - hiveToVertexClass.getSimpleName()) + "), " + - "\"disable\" will unset this option"); - options.addOption("vi", "vertexInputTable", true, - "Vertex input table name"); - options.addOption("VI", "vertexInputFilter", true, - "Vertex input table filter expression (e.g., \"a<2 AND b='two'\""); + options.addOption("vi", "vertexInput", true, getInputOptionDescription( + "vertex", HiveToVertex.class.getSimpleName())); // Edge input settings - options.addOption(null, "hiveToEdgeClass", true, - "Giraph " + HiveToEdge.class.getSimpleName() + - " class to use (default - " + - (hiveToEdgeClass == null ? "not used" : - hiveToEdgeClass.getSimpleName()) + "), " + - "\"disable\" will unset this option"); - options.addOption("ei", "edgeInputTable", true, - "Edge input table name"); - options.addOption("EI", "edgeInputFilter", true, - "Edge input table filter expression (e.g., \"a<2 AND b='two'\""); + options.addOption("ei", "edgeInput", true, getInputOptionDescription( + "edge", HiveToEdge.class.getSimpleName())); // Vertex output settings - if (vertexToHiveClass == null) { - options.addOption(null, "vertexToHiveClass", true, - "Giraph " + VertexToHive.class.getSimpleName() + " class to use"); - } - - options.addOption("o", "outputTable", true, "Output table name"); - options.addOption("O", "outputPartition", true, - "Output table partition values (e.g., \"a=1,b=two\")"); + options.addOption("o", "output", true, + "Giraph " + VertexToHive.class.getSimpleName() + " class to use," + + " table name and partition filter (optional). Example:\n" + + "\"MyVertexToHive, myTableName, a=1,b=two\""); options.addOption("s", "skipOutput", false, "Skip output?"); } /** + * Get description for the input format option (vertex or edge). + * + * @param inputType Type of input (vertex or edge) + * @param hiveToObjectClassName HiveToVertex or HiveToEdge + * @return Description for the input format option + */ + private static String getInputOptionDescription(String inputType, + String hiveToObjectClassName) { + StringBuilder inputOption = new StringBuilder(); + inputOption.append("Giraph ").append(hiveToObjectClassName).append( + " class to use, table name and partition filter (optional)."); + inputOption.append(" Additional options for the input format can be " + + "specified as well."); + inputOption.append(" You can set as many ").append(inputType).append( + " inputs as you like."); + inputOption.append(" Example:\n"); + inputOption.append("\"My").append(hiveToObjectClassName).append( + ", myTableName, a<2 AND b='two', option1=value1, option2=value2\""); + return inputOption.toString(); + } + + /** * add string to collection * @param conf Configuration * @param name name to add @@ -533,7 +606,7 @@ public class HiveGiraphRunner implements Tool { @Override public final void setConf(Configuration conf) { - this.conf = conf; + this.conf = new GiraphConfiguration(conf); } /** @@ -569,35 +642,83 @@ public class HiveGiraphRunner implements Tool { * @param giraphConf GiraphConfiguration */ private void logOptions(GiraphConfiguration giraphConf) { - GiraphClasses classes = new GiraphClasses(giraphConf); + GiraphClasses<?, ?, ?, ?> classes = new GiraphClasses(giraphConf); LOG.info(getClass().getSimpleName() + " with"); LOG.info(LOG_PREFIX + "-vertexClass=" + vertexClass.getCanonicalName()); - if (hiveToVertexClass != null) { - LOG.info(LOG_PREFIX + "-hiveToVertexClass=" + - hiveToVertexClass.getCanonicalName()); - } - if (classes.getVertexInputFormatClass() != null) { - LOG.info(LOG_PREFIX + "-vertexInputFormatClass=" + - classes.getVertexInputFormatClass().getCanonicalName()); + for (VertexInputFormatDescription description : vertexInputDescriptions) { + LOG.info(LOG_PREFIX + "Vertex input: " + description); } - if (hiveToEdgeClass != null) { - LOG.info(LOG_PREFIX + "-hiveToEdgeClass=" + - hiveToEdgeClass.getCanonicalName()); - } - if (classes.getEdgeInputFormatClass() != null) { - LOG.info(LOG_PREFIX + "-edgeInputFormatClass=" + - classes.getEdgeInputFormatClass().getCanonicalName()); + for (EdgeInputFormatDescription description : edgeInputDescriptions) { + LOG.info(LOG_PREFIX + "Edge input: " + description); } if (classes.getVertexOutputFormatClass() != null) { - LOG.info(LOG_PREFIX + "-outputFormatClass=" + - classes.getVertexOutputFormatClass().getCanonicalName()); + LOG.info(LOG_PREFIX + "Output: VertexToHive=" + + vertexToHiveClass.getCanonicalName() + ", table=" + + HIVE_VERTEX_OUTPUT_TABLE.get(conf) + ", partition=\"" + + HIVE_VERTEX_OUTPUT_PARTITION.get(conf) + "\""); } LOG.info(LOG_PREFIX + "-workers=" + workers); } + + /** + * Split a string using separator and trim the results + * + * @param stringToSplit String to split + * @param separator Separator + * @return Separated strings, trimmed + */ + private static String[] split(String stringToSplit, String separator) { + return split(stringToSplit, separator, -1); + } + + /** + * Split a string using separator and trim the results + * + * @param stringToSplit String to split + * @param separator Separator + * @param limit See {@link String#split(String, int)} + * @return Separated strings, trimmed + */ + private static String[] split(String stringToSplit, String separator, + int limit) { + Splitter splitter = Splitter.on(separator).trimResults(); + if (limit > 0) { + splitter = splitter.limit(limit); + } + return Iterables.toArray(splitter.split(stringToSplit), String.class); + } + + /** + * Get the element in array at certain position, or null if the position is + * out of array size + * + * @param array Array + * @param position Position + * @return Element at the position or null if the position is out of array + */ + private static String elementOrNull(String[] array, int position) { + return (position < array.length) ? array[position] : null; + } + + /** + * Return a copy of array from some position to the end, + * or empty array if startIndex is out of array size + * + * @param array Array to take a copy from + * @param startIndex Starting position + * @return Copy of part of the array + */ + private static String[] copyOfArray(String[] array, int startIndex) { + if (array.length <= startIndex) { + return new String[0]; + } else { + return Arrays.copyOfRange(array, startIndex, array.length); + } + } }
