Updated Branches:
  refs/heads/trunk ff970f2db -> c5a87d161

http://git-wip-us.apache.org/repos/asf/giraph/blob/c5a87d16/giraph-hive/src/main/java/org/apache/giraph/hive/HiveGiraphRunner.java
----------------------------------------------------------------------
diff --git 
a/giraph-hive/src/main/java/org/apache/giraph/hive/HiveGiraphRunner.java 
b/giraph-hive/src/main/java/org/apache/giraph/hive/HiveGiraphRunner.java
index 8d67c1d..98be881 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/HiveGiraphRunner.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/HiveGiraphRunner.java
@@ -26,6 +26,7 @@ import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
 import org.apache.giraph.conf.GiraphClasses;
 import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.graph.Vertex;
 import org.apache.giraph.hive.input.edge.HiveEdgeInputFormat;
 import org.apache.giraph.hive.input.edge.HiveToEdge;
@@ -33,14 +34,20 @@ import org.apache.giraph.hive.input.vertex.HiveToVertex;
 import org.apache.giraph.hive.input.vertex.HiveVertexInputFormat;
 import org.apache.giraph.hive.output.HiveVertexOutputFormat;
 import org.apache.giraph.hive.output.VertexToHive;
+import org.apache.giraph.io.formats.multi.EdgeInputFormatDescription;
+import org.apache.giraph.io.formats.multi.InputFormatDescription;
+import org.apache.giraph.io.formats.multi.MultiEdgeInputFormat;
+import org.apache.giraph.io.formats.multi.MultiVertexInputFormat;
+import org.apache.giraph.io.formats.multi.VertexInputFormatDescription;
 import org.apache.giraph.job.GiraphJob;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.log4j.Logger;
-import org.apache.thrift.TException;
 
+import com.google.common.base.Splitter;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 
 import java.io.File;
@@ -81,11 +88,13 @@ public class HiveGiraphRunner implements Tool {
   /** vertex class. */
   private Class<? extends Vertex> vertexClass;
 
-  /** Vertex creator from hive records. */
-  private Class<? extends HiveToVertex> hiveToVertexClass;
+  /** Descriptions of vertex input formats */
+  private List<VertexInputFormatDescription> vertexInputDescriptions =
+      Lists.newArrayList();
 
-  /** Edge creator from hive records. */
-  private Class<? extends HiveToEdge> hiveToEdgeClass;
+  /** Descriptions of edge input formats */
+  private List<EdgeInputFormatDescription> edgeInputDescriptions =
+      Lists.newArrayList();
 
   /** Hive Vertex writer */
   private Class<? extends VertexToHive> vertexToHiveClass;
@@ -108,51 +117,100 @@ public class HiveGiraphRunner implements Tool {
     this.vertexClass = vertexClass;
   }
 
-  public Class<? extends HiveToVertex> getHiveToVertexClass() {
-    return hiveToVertexClass;
+  public List<VertexInputFormatDescription> getVertexInputDescriptions() {
+    return vertexInputDescriptions;
   }
 
   /**
-   * Set HiveToVertex used with HiveVertexInputFormat
+   * Whether to use vertex input.
    *
-   * @param hiveToVertexClass HiveToVertex
+   * @return true if vertex input enabled (at least one HiveToVertex is set).
    */
-  public void setHiveToVertexClass(
-      Class<? extends HiveToVertex> hiveToVertexClass) {
-    this.hiveToVertexClass = hiveToVertexClass;
-    HIVE_TO_VERTEX_CLASS.set(conf, hiveToVertexClass);
+  public boolean hasVertexInput() {
+    return !vertexInputDescriptions.isEmpty();
   }
 
   /**
-   * Whether to use vertex input.
+   * Add vertex input
    *
-   * @return true if vertex input enabled (HiveToVertex is set).
+   * @param hiveToVertexClass HiveToVertex class to use
+   * @param tableName Table name
+   * @param partitionFilter Partition filter, or null if no filter used
+   * @param additionalOptions Additional options, in the form "option=value"
    */
-  public boolean hasVertexValueInput() {
-    return hiveToVertexClass != null;
+  public void addVertexInput(Class<? extends HiveToVertex> hiveToVertexClass,
+      String tableName, String partitionFilter, String ... additionalOptions) {
+    VertexInputFormatDescription description =
+        new VertexInputFormatDescription(HiveVertexInputFormat.class);
+    description.addParameter(
+        HIVE_TO_VERTEX_CLASS.getKey(), hiveToVertexClass.getName());
+    description.addParameter(HIVE_VERTEX_INPUT_PROFILE_ID.getKey(),
+        "vertex_input_profile_" + vertexInputDescriptions.size());
+    description.addParameter(
+        HIVE_VERTEX_INPUT_TABLE.getKey(), tableName);
+    if (partitionFilter != null && !partitionFilter.isEmpty()) {
+      description.addParameter(
+          HIVE_VERTEX_INPUT_PARTITION.getKey(), partitionFilter);
+    }
+    addAdditionalOptions(description, additionalOptions);
+    vertexInputDescriptions.add(description);
   }
 
-  public Class<? extends HiveToEdge> getHiveToEdgeClass() {
-    return hiveToEdgeClass;
+  public List<EdgeInputFormatDescription> getEdgeInputDescriptions() {
+    return edgeInputDescriptions;
   }
 
   /**
    * Whether to use edge input.
    *
-   * @return true if edge input enabled (HiveToEdge is set).
+   * @return true if edge input enabled (at least one HiveToEdge is set).
    */
   public boolean hasEdgeInput() {
-    return hiveToEdgeClass != null;
+    return !edgeInputDescriptions.isEmpty();
+  }
+
+  /**
+   * Add edge input
+   *
+   * @param hiveToEdgeClass HiveToEdge class to use
+   * @param tableName Table name
+   * @param partitionFilter Partition filter, or null if no filter used
+   * @param additionalOptions Additional options, in the form "option=value"
+   */
+  public void addEdgeInput(Class<? extends HiveToEdge> hiveToEdgeClass,
+      String tableName, String partitionFilter, String ... additionalOptions) {
+    EdgeInputFormatDescription description =
+        new EdgeInputFormatDescription(HiveEdgeInputFormat.class);
+    description.addParameter(
+        HIVE_TO_EDGE_CLASS.getKey(), hiveToEdgeClass.getName());
+    description.addParameter(HIVE_EDGE_INPUT_PROFILE_ID.getKey(),
+        "edge_input_profile_" + edgeInputDescriptions.size());
+    description.addParameter(
+        HIVE_EDGE_INPUT_TABLE.getKey(), tableName);
+    if (partitionFilter != null && !partitionFilter.isEmpty()) {
+      description.addParameter(
+          HIVE_EDGE_INPUT_PARTITION.getKey(), partitionFilter);
+    }
+    addAdditionalOptions(description, additionalOptions);
+    edgeInputDescriptions.add(description);
   }
 
   /**
-   * Set HiveToEdge used with HiveEdgeInputFormat
+   * Add additional options to InputFormatDescription
    *
-   * @param hiveToEdgeClass HiveToEdge
+   * @param description InputFormatDescription
+   * @param additionalOptions Additional options
    */
-  public void setHiveToEdgeClass(Class<? extends HiveToEdge> hiveToEdgeClass) {
-    this.hiveToEdgeClass = hiveToEdgeClass;
-    HIVE_TO_EDGE_CLASS.set(conf, hiveToEdgeClass);
+  private static void addAdditionalOptions(InputFormatDescription description,
+      String ... additionalOptions) {
+    for (String additionalOption : additionalOptions) {
+      String[] nameValue = split(additionalOption, "=");
+      if (nameValue.length != 2) {
+        throw new IllegalStateException("Invalid additional option format " +
+            additionalOption + ", 'name=value' format expected");
+      }
+      description.addParameter(nameValue[0], nameValue[1]);
+    }
   }
 
   public Class<? extends VertexToHive> getVertexToHiveClass() {
@@ -169,14 +227,22 @@ public class HiveGiraphRunner implements Tool {
   }
 
   /**
-   * Set class used to write vertices to Hive.
+   * Set vertex output
    *
    * @param vertexToHiveClass class for writing vertices to Hive.
+   * @param tableName Table name
+   * @param partitionFilter Partition filter, or null if no filter used
    */
-  public void setVertexToHiveClass(
-      Class<? extends VertexToHive> vertexToHiveClass) {
+  public void setVertexOutput(
+      Class<? extends VertexToHive> vertexToHiveClass, String tableName,
+      String partitionFilter) {
     this.vertexToHiveClass = vertexToHiveClass;
     VERTEX_TO_HIVE_CLASS.set(conf, vertexToHiveClass);
+    HIVE_VERTEX_OUTPUT_PROFILE_ID.set(conf, "vertex_output_profile");
+    HIVE_VERTEX_OUTPUT_TABLE.set(conf, tableName);
+    if (partitionFilter != null) {
+      HIVE_VERTEX_OUTPUT_PARTITION.set(conf, partitionFilter);
+    }
   }
 
   /**
@@ -209,9 +275,6 @@ public class HiveGiraphRunner implements Tool {
     GiraphConfiguration giraphConf = job.getConfiguration();
     giraphConf.setVertexClass(vertexClass);
 
-    setupHiveInputs(giraphConf);
-    setupHiveOutput(giraphConf);
-
     giraphConf.setWorkerConfiguration(workers, workers, 100.0f);
     initGiraphJob(job);
 
@@ -221,42 +284,48 @@ public class HiveGiraphRunner implements Tool {
   }
 
   /**
-   * Initialize hive input settings
-   *
-   * @param conf Configuration to write to
-   * @throws TException thrift problem
+   * Prepare vertex input settings in Configuration
    */
-  private void setupHiveInputs(GiraphConfiguration conf) throws TException {
-    if (hiveToVertexClass != null) {
-      conf.setVertexInputFormatClass(HiveVertexInputFormat.class);
-      HIVE_VERTEX_INPUT_PROFILE_ID.set(conf, "vertex_input_profile");
-    }
-
-    if (hiveToEdgeClass != null) {
-      conf.setEdgeInputFormatClass(HiveEdgeInputFormat.class);
-      HIVE_EDGE_INPUT_PROFILE_ID.set(conf, "edge_input_profile");
+  @SuppressWarnings("unchecked")
+  public void prepareHiveVertexInputs() {
+    if (vertexInputDescriptions.size() == 1) {
+      GiraphConstants.VERTEX_INPUT_FORMAT_CLASS.set(conf,
+          vertexInputDescriptions.get(0).getInputFormatClass());
+      vertexInputDescriptions.get(0).putParametersToConfiguration(conf);
+    } else {
+      GiraphConstants.VERTEX_INPUT_FORMAT_CLASS.set(conf,
+          MultiVertexInputFormat.class);
+      VertexInputFormatDescription.VERTEX_INPUT_FORMAT_DESCRIPTIONS.set(conf,
+          InputFormatDescription.toJsonString(vertexInputDescriptions));
     }
   }
 
   /**
-   * Initialize hive output settings
-   *
-   * @param conf Configuration to write to
-   * @throws TException thrift problem
+   * Prepare edge input settings in Configuration
    */
-  private void setupHiveOutput(GiraphConfiguration conf) throws TException {
-    if (skipOutput) {
-      LOG.warn("run: Warning - Output will be skipped!");
-    } else if (vertexToHiveClass != null) {
-      conf.setVertexOutputFormatClass(HiveVertexOutputFormat.class);
-      HIVE_VERTEX_OUTPUT_PROFILE_ID.set(conf, "vertex_output_profile");
+  @SuppressWarnings("unchecked")
+  public void prepareHiveEdgeInputs() {
+    if (edgeInputDescriptions.size() == 1) {
+      GiraphConstants.EDGE_INPUT_FORMAT_CLASS.set(conf,
+          edgeInputDescriptions.get(0).getInputFormatClass());
+      edgeInputDescriptions.get(0).putParametersToConfiguration(conf);
     } else {
-      LOG.fatal("output requested but " + VertexToHive.class.getSimpleName() +
-          " not set");
+      GiraphConstants.EDGE_INPUT_FORMAT_CLASS.set(conf,
+          MultiEdgeInputFormat.class);
+      EdgeInputFormatDescription.EDGE_INPUT_FORMAT_DESCRIPTIONS.set(conf,
+          InputFormatDescription.toJsonString(edgeInputDescriptions));
     }
   }
 
   /**
+   * Prepare output settings in Configuration
+   */
+  public void prepareHiveOutput() {
+    GiraphConstants.VERTEX_OUTPUT_FORMAT_CLASS.set(conf,
+        HiveVertexOutputFormat.class);
+  }
+
+  /**
    * set hive configuration
    */
   private void adjustConfigurationForHive() {
@@ -312,47 +381,65 @@ public class HiveGiraphRunner implements Tool {
               " class name (-vertexClass) to use");
     }
 
-    String hiveToVertexClassStr = cmdln.getOptionValue("hiveToVertexClass");
-    if (hiveToVertexClassStr != null) {
-      if (hiveToVertexClassStr.equals("disable")) {
-        hiveToVertexClass = null;
-      } else {
-        setHiveToVertexClass(
-            findClass(hiveToVertexClassStr, HiveToVertex.class));
+    String[] vertexInputs = cmdln.getOptionValues("vertexInput");
+    if (vertexInputs != null && vertexInputs.length != 0) {
+      vertexInputDescriptions.clear();
+      for (String vertexInput : vertexInputs) {
+        String[] parameters = split(vertexInput, ",");
+        if (parameters.length < 2) {
+          throw new IllegalStateException("Illegal vertex input description " +
+              vertexInput + " - HiveToVertex class and table name needed");
+        }
+        addVertexInput(findClass(parameters[0], HiveToVertex.class),
+            parameters[1], elementOrNull(parameters, 2),
+            copyOfArray(parameters, 3));
       }
     }
 
-    String hiveToEdgeClassStr = cmdln.getOptionValue("hiveToEdgeClass");
-    if (hiveToEdgeClassStr != null) {
-      if (hiveToEdgeClassStr.equals("disable")) {
-        hiveToEdgeClass = null;
-      } else {
-        setHiveToEdgeClass(
-            findClass(hiveToEdgeClassStr, HiveToEdge.class));
+    String[] edgeInputs = cmdln.getOptionValues("edgeInput");
+    if (edgeInputs != null && edgeInputs.length != 0) {
+      edgeInputDescriptions.clear();
+      for (String edgeInput : edgeInputs) {
+        String[] parameters = split(edgeInput, ",");
+        if (parameters.length < 2) {
+          throw new IllegalStateException("Illegal edge input description " +
+              edgeInput + " - HiveToEdge class and table name needed");
+        }
+        addEdgeInput(findClass(parameters[0], HiveToEdge.class),
+            parameters[1], elementOrNull(parameters, 2),
+            copyOfArray(parameters, 3));
       }
     }
 
-    String vertexToHiveClassStr = cmdln.getOptionValue("vertexToHiveClass");
-    if (vertexToHiveClassStr != null) {
-      setVertexToHiveClass(findClass(vertexToHiveClassStr, 
VertexToHive.class));
+    String output = cmdln.getOptionValue("output");
+    if (output != null) {
+      // Partition filter can contain commas so we limit the number of times
+      // we split
+      String[] parameters = split(output, ",", 3);
+      if (parameters.length < 2) {
+        throw new IllegalStateException("Illegal output description " +
+            output + " - VertexToHive class and table name needed");
+      }
+      setVertexOutput(findClass(parameters[0], VertexToHive.class),
+          parameters[1], elementOrNull(parameters, 2));
     }
 
     if (cmdln.hasOption("skipOutput")) {
       skipOutput = true;
     }
 
-    if (hiveToVertexClass == null && hiveToEdgeClass == null) {
+    if (!hasVertexInput() && !hasEdgeInput()) {
       throw new IllegalArgumentException(
           "Need at least one of Giraph " +
           HiveToVertex.class.getSimpleName() +
-          " class name (-hiveToVertexClass) and " +
+          " (-vertexInput) and " +
           HiveToEdge.class.getSimpleName() +
-          " class name (-hiveToEdgeClass)");
+          " (-edgeInput)");
     }
     if (vertexToHiveClass == null && !skipOutput) {
       throw new IllegalArgumentException(
           "Need the Giraph " + VertexToHive.class.getSimpleName() +
-          " class name (-vertexToHiveClass) to use");
+          " (-output) to use");
     }
     String workersStr = cmdln.getOptionValue("workers");
     if (workersStr == null) {
@@ -360,40 +447,24 @@ public class HiveGiraphRunner implements Tool {
           "Need to choose the number of workers (-w)");
     }
 
-    String vertexInputTableStr = cmdln.getOptionValue("vertexInputTable");
-    if (vertexInputTableStr == null && hiveToVertexClass != null) {
-      throw new IllegalArgumentException(
-          "Need to set the vertex input table name (-vi)");
-    }
+    String dbName = cmdln.getOptionValue("dbName", "default");
 
-    String edgeInputTableStr = cmdln.getOptionValue("edgeInputTable");
-    if (edgeInputTableStr == null && hiveToEdgeClass != null) {
-      throw new IllegalArgumentException(
-          "Need to set the edge input table name (-ei)");
+    if (hasVertexInput()) {
+      HIVE_VERTEX_INPUT_DATABASE.set(conf, dbName);
+      prepareHiveVertexInputs();
     }
 
-    String outputTableStr = cmdln.getOptionValue("outputTable");
-    if (outputTableStr == null) {
-      throw new IllegalArgumentException(
-          "Need to set the output table name (-o)");
+    if (hasEdgeInput()) {
+      HIVE_EDGE_INPUT_DATABASE.set(conf, dbName);
+      prepareHiveEdgeInputs();
     }
 
-    String dbName = cmdln.getOptionValue("dbName", "default");
-
-    HIVE_VERTEX_INPUT_DATABASE.set(conf, dbName);
-    HIVE_VERTEX_INPUT_TABLE.set(conf, vertexInputTableStr);
-    HIVE_VERTEX_INPUT_PARTITION.set(conf,
-        cmdln.getOptionValue("vertexInputFilter"));
-
-    HIVE_EDGE_INPUT_DATABASE.set(conf, dbName);
-    HIVE_EDGE_INPUT_TABLE.set(conf, edgeInputTableStr);
-    HIVE_EDGE_INPUT_PARTITION.set(conf,
-        cmdln.getOptionValue("edgeInputFilter"));
-
-    HIVE_VERTEX_OUTPUT_DATABASE.set(conf, dbName);
-    HIVE_VERTEX_OUTPUT_TABLE.set(conf, cmdln.getOptionValue("outputTable"));
-    HIVE_VERTEX_OUTPUT_PARTITION.set(conf,
-        cmdln.getOptionValue("outputPartition"));
+    if (!skipOutput) {
+      HIVE_VERTEX_OUTPUT_DATABASE.set(conf, dbName);
+      prepareHiveOutput();
+    } else {
+      LOG.warn("run: Warning - Output will be skipped!");
+    }
 
     workers = Integer.parseInt(workersStr);
 
@@ -447,42 +518,44 @@ public class HiveGiraphRunner implements Tool {
     options.addOption("db", "dbName", true, "Hive database name");
 
     // Vertex input settings
-    options.addOption(null, "hiveToVertexClass", true,
-        "Giraph " + HiveToVertex.class.getSimpleName() +
-            " class to use (default - " +
-            (hiveToVertexClass == null ? "not used" :
-                hiveToVertexClass.getSimpleName()) + "), " +
-            "\"disable\" will unset this option");
-    options.addOption("vi", "vertexInputTable", true,
-        "Vertex input table name");
-    options.addOption("VI", "vertexInputFilter", true,
-        "Vertex input table filter expression (e.g., \"a<2 AND b='two'\"");
+    options.addOption("vi", "vertexInput", true, getInputOptionDescription(
+        "vertex", HiveToVertex.class.getSimpleName()));
 
     // Edge input settings
-    options.addOption(null, "hiveToEdgeClass", true,
-        "Giraph " + HiveToEdge.class.getSimpleName() +
-            " class to use (default - " +
-            (hiveToEdgeClass == null ? "not used" :
-                hiveToEdgeClass.getSimpleName()) + "), " +
-            "\"disable\" will unset this option");
-    options.addOption("ei", "edgeInputTable", true,
-        "Edge input table name");
-    options.addOption("EI", "edgeInputFilter", true,
-        "Edge input table filter expression (e.g., \"a<2 AND b='two'\"");
+    options.addOption("ei", "edgeInput", true, getInputOptionDescription(
+        "edge", HiveToEdge.class.getSimpleName()));
 
     // Vertex output settings
-    if (vertexToHiveClass == null) {
-      options.addOption(null, "vertexToHiveClass", true,
-          "Giraph " + VertexToHive.class.getSimpleName() + " class to use");
-    }
-
-    options.addOption("o", "outputTable", true, "Output table name");
-    options.addOption("O", "outputPartition", true,
-        "Output table partition values (e.g., \"a=1,b=two\")");
+    options.addOption("o", "output", true,
+        "Giraph " + VertexToHive.class.getSimpleName() + " class to use," +
+            " table name and partition filter (optional). Example:\n" +
+            "\"MyVertexToHive, myTableName, a=1,b=two\"");
     options.addOption("s", "skipOutput", false, "Skip output?");
   }
 
   /**
+   * Get description for the input format option (vertex or edge).
+   *
+   * @param inputType Type of input (vertex or edge)
+   * @param hiveToObjectClassName HiveToVertex or HiveToEdge
+   * @return Description for the input format option
+   */
+  private static String getInputOptionDescription(String inputType,
+      String hiveToObjectClassName) {
+    StringBuilder inputOption = new StringBuilder();
+    inputOption.append("Giraph ").append(hiveToObjectClassName).append(
+        " class to use, table name and partition filter (optional).");
+    inputOption.append(" Additional options for the input format can be " +
+        "specified as well.");
+    inputOption.append(" You can set as many ").append(inputType).append(
+        " inputs as you like.");
+    inputOption.append(" Example:\n");
+    inputOption.append("\"My").append(hiveToObjectClassName).append(
+        ", myTableName, a<2 AND b='two', option1=value1, option2=value2\"");
+    return inputOption.toString();
+  }
+
+  /**
    * add string to collection
    * @param conf Configuration
    * @param name name to add
@@ -533,7 +606,7 @@ public class HiveGiraphRunner implements Tool {
 
   @Override
   public final void setConf(Configuration conf) {
-    this.conf = conf;
+    this.conf = new GiraphConfiguration(conf);
   }
 
   /**
@@ -569,35 +642,83 @@ public class HiveGiraphRunner implements Tool {
    * @param giraphConf GiraphConfiguration
    */
   private void logOptions(GiraphConfiguration giraphConf) {
-    GiraphClasses classes = new GiraphClasses(giraphConf);
+    GiraphClasses<?, ?, ?, ?> classes = new GiraphClasses(giraphConf);
 
     LOG.info(getClass().getSimpleName() + " with");
 
     LOG.info(LOG_PREFIX + "-vertexClass=" + vertexClass.getCanonicalName());
 
-    if (hiveToVertexClass != null) {
-      LOG.info(LOG_PREFIX + "-hiveToVertexClass=" +
-          hiveToVertexClass.getCanonicalName());
-    }
-    if (classes.getVertexInputFormatClass() != null) {
-      LOG.info(LOG_PREFIX + "-vertexInputFormatClass=" +
-          classes.getVertexInputFormatClass().getCanonicalName());
+    for (VertexInputFormatDescription description : vertexInputDescriptions) {
+      LOG.info(LOG_PREFIX + "Vertex input: " + description);
     }
 
-    if (hiveToEdgeClass != null) {
-      LOG.info(LOG_PREFIX + "-hiveToEdgeClass=" +
-          hiveToEdgeClass.getCanonicalName());
-    }
-    if (classes.getEdgeInputFormatClass() != null) {
-      LOG.info(LOG_PREFIX + "-edgeInputFormatClass=" +
-          classes.getEdgeInputFormatClass().getCanonicalName());
+    for (EdgeInputFormatDescription description : edgeInputDescriptions) {
+      LOG.info(LOG_PREFIX + "Edge input: " + description);
     }
 
     if (classes.getVertexOutputFormatClass() != null) {
-      LOG.info(LOG_PREFIX + "-outputFormatClass=" +
-          classes.getVertexOutputFormatClass().getCanonicalName());
+      LOG.info(LOG_PREFIX + "Output: VertexToHive=" +
+          vertexToHiveClass.getCanonicalName() + ", table=" +
+          HIVE_VERTEX_OUTPUT_TABLE.get(conf) + ", partition=\"" +
+          HIVE_VERTEX_OUTPUT_PARTITION.get(conf) + "\"");
     }
 
     LOG.info(LOG_PREFIX + "-workers=" + workers);
   }
+
+  /**
+   * Split a string using separator and trim the results
+   *
+   * @param stringToSplit String to split
+   * @param separator Separator
+   * @return Separated strings, trimmed
+   */
+  private static String[] split(String stringToSplit, String separator) {
+    return split(stringToSplit, separator, -1);
+  }
+
+  /**
+   * Split a string using separator and trim the results
+   *
+   * @param stringToSplit String to split
+   * @param separator Separator
+   * @param limit See {@link String#split(String, int)}
+   * @return Separated strings, trimmed
+   */
+  private static String[] split(String stringToSplit, String separator,
+      int limit) {
+    Splitter splitter = Splitter.on(separator).trimResults();
+    if (limit > 0) {
+      splitter = splitter.limit(limit);
+    }
+    return Iterables.toArray(splitter.split(stringToSplit), String.class);
+  }
+
+  /**
+   * Get the element in array at certain position, or null if the position is
+   * out of array size
+   *
+   * @param array Array
+   * @param position Position
+   * @return Element at the position or null if the position is out of array
+   */
+  private static String elementOrNull(String[] array, int position) {
+    return (position < array.length) ? array[position] : null;
+  }
+
+  /**
+   * Return a copy of array from some position to the end,
+   * or empty array if startIndex is out of array size
+   *
+   * @param array Array to take a copy from
+   * @param startIndex Starting position
+   * @return Copy of part of the array
+   */
+  private static String[] copyOfArray(String[] array, int startIndex) {
+    if (array.length <= startIndex) {
+      return new String[0];
+    } else {
+      return Arrays.copyOfRange(array, startIndex, array.length);
+    }
+  }
 }

Reply via email to