Updated Branches:
  refs/heads/trunk e28bfd323 -> 909d4c3cc

GIRAPH-789: Upgrade hive-io to 0.20 - less metastore accesses (majakabiljo)


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/909d4c3c
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/909d4c3c
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/909d4c3c

Branch: refs/heads/trunk
Commit: 909d4c3cc09bcc83c199000f632c798b39b24bf7
Parents: e28bfd3
Author: Maja Kabiljo <[email protected]>
Authored: Thu Oct 31 13:55:00 2013 -0700
Committer: Maja Kabiljo <[email protected]>
Committed: Thu Oct 31 13:55:32 2013 -0700

----------------------------------------------------------------------
 CHANGELOG                                       |   2 +
 .../apache/giraph/hive/HiveGiraphRunner.java    | 113 ++++++++++++++++++-
 .../apache/giraph/hive/common/HiveUtils.java    |  49 --------
 .../hive/input/edge/HiveEdgeInputFormat.java    |   4 +-
 .../input/vertex/HiveVertexInputFormat.java     |   7 +-
 .../hive/output/HiveVertexOutputFormat.java     |   8 +-
 pom.xml                                         |   2 +-
 7 files changed, 118 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/909d4c3c/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 707d971..4465ac1 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
 Giraph Change Log
 
 Release 1.1.0 - unreleased
+  GIRAPH-789: Upgrade hive-io to 0.20 - less metastore accesses (majakabiljo)
+
   GIRAPH-787: Use HiveIO 1.9 (gmalewicz via aching)
 
   GIRAPH-786: XSparseVector create a lot of objects in add/write

http://git-wip-us.apache.org/repos/asf/giraph/blob/909d4c3c/giraph-hive/src/main/java/org/apache/giraph/hive/HiveGiraphRunner.java
----------------------------------------------------------------------
diff --git 
a/giraph-hive/src/main/java/org/apache/giraph/hive/HiveGiraphRunner.java 
b/giraph-hive/src/main/java/org/apache/giraph/hive/HiveGiraphRunner.java
index 6b8a8e9..cbcd788 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/HiveGiraphRunner.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/HiveGiraphRunner.java
@@ -26,6 +26,7 @@ import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
 import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.graph.Computation;
 import org.apache.giraph.hive.common.HiveUtils;
 import org.apache.giraph.hive.input.edge.HiveEdgeInputFormat;
@@ -276,7 +277,58 @@ public class HiveGiraphRunner implements Tool {
   }
 
   /**
-   * Prepare vertex input settings in Configuration
+   * Create ImmutableClassesGiraphConfiguration from provided Configuration
+   * which is going to copy all the values set to it to this original
+   * Configuration
+   *
+   * @param conf Configuration to create ImmutableClassesGiraphConfiguration
+   *             from and update with any changes to the returned configuration
+   * @return ImmutableClassesGiraphConfiguration
+   */
+  private ImmutableClassesGiraphConfiguration createGiraphConf(
+      final Configuration conf) {
+    return new ImmutableClassesGiraphConfiguration(conf) {
+      @Override
+      public void set(String name, String value) {
+        super.set(name, value);
+        conf.set(name, value);
+      }
+    };
+  }
+
+  /**
+   * Create ImmutableClassesGiraphConfiguration from provided Configuration
+   * which is going to copy all the values set to it to provided
+   * InputFormatDescription
+   *
+   * @param conf Configuration to create ImmutableClassesGiraphConfiguration
+   *             from
+   * @param inputFormatDescription InputFormatDescription to update with any
+   *                               changes to the returned configuration
+   * @return ImmutableClassesGiraphConfiguration
+   */
+  private ImmutableClassesGiraphConfiguration createGiraphConf(
+      Configuration conf,
+      final InputFormatDescription inputFormatDescription) {
+    return new ImmutableClassesGiraphConfiguration(conf) {
+      @Override
+      public void set(String name, String value) {
+        super.set(name, value);
+        inputFormatDescription.addParameter(name, value);
+      }
+    };
+  }
+
+  /**
+   * Prepare vertex input settings in Configuration.
+   *
+   * For all Hive vertex inputs, add the user settings to the configuration.
+   * Additionally, this checks the input specs for every input and caches
+   * metadata information into the configuration to eliminate worker access to
+   * the metastore and fail earlier in the case that metadata doesn't exist.
+   * In the case of multiple vertex input descriptions, metadata is cached in
+   * each vertex input format description and then saved into a single
+   * Configuration via JSON.
    */
   @SuppressWarnings("unchecked")
   public void prepareHiveVertexInputs() {
@@ -284,7 +336,27 @@ public class HiveGiraphRunner implements Tool {
       GiraphConstants.VERTEX_INPUT_FORMAT_CLASS.set(conf,
           vertexInputDescriptions.get(0).getInputFormatClass());
       vertexInputDescriptions.get(0).putParametersToConfiguration(conf);
+      // Create VertexInputFormat in order to initialize the Configuration with
+      // data from metastore, and check it
+      createGiraphConf(conf).createWrappedVertexInputFormat()
+          .checkInputSpecs(conf);
     } else {
+      // For each of the VertexInputFormats we'll prepare Configuration
+      // parameters
+      for (int i = 0; i < vertexInputDescriptions.size(); i++) {
+        // Create a copy of the Configuration in order not to mess up the
+        // original one
+        Configuration confCopy = new Configuration(conf);
+        final VertexInputFormatDescription vertexInputDescription =
+            vertexInputDescriptions.get(i);
+        GiraphConstants.VERTEX_INPUT_FORMAT_CLASS.set(confCopy,
+            vertexInputDescription.getInputFormatClass());
+        vertexInputDescription.putParametersToConfiguration(confCopy);
+        // Create VertexInputFormat in order to initialize its description with
+        // data from metastore, and check it
+        createGiraphConf(confCopy, vertexInputDescription)
+            .createWrappedVertexInputFormat().checkInputSpecs(confCopy);
+      }
       GiraphConstants.VERTEX_INPUT_FORMAT_CLASS.set(conf,
           MultiVertexInputFormat.class);
       VertexInputFormatDescription.VERTEX_INPUT_FORMAT_DESCRIPTIONS.set(conf,
@@ -293,7 +365,15 @@ public class HiveGiraphRunner implements Tool {
   }
 
   /**
-   * Prepare edge input settings in Configuration
+   * Prepare edge input settings in Configuration.
+   *
+   * For all Hive edge inputs, add the user settings to the configuration.
+   * Additionally, this checks the input specs for every input and caches
+   * metadata information into the configuration to eliminate worker access to
+   * the metastore and fail earlier in the case that metadata doesn't exist.
+   * In the case of multiple edge input descriptions, metadata is cached in 
each
+   * vertex input format description and then saved into a single
+   * Configuration via JSON.
    */
   @SuppressWarnings("unchecked")
   public void prepareHiveEdgeInputs() {
@@ -301,7 +381,27 @@ public class HiveGiraphRunner implements Tool {
       GiraphConstants.EDGE_INPUT_FORMAT_CLASS.set(conf,
           edgeInputDescriptions.get(0).getInputFormatClass());
       edgeInputDescriptions.get(0).putParametersToConfiguration(conf);
+      // Create EdgeInputFormat in order to initialize the Configuration with
+      // data from metastore, and check it
+      createGiraphConf(conf).createWrappedEdgeInputFormat()
+          .checkInputSpecs(conf);
     } else {
+      // For each of the EdgeInputFormats we'll prepare Configuration
+      // parameters
+      for (int i = 0; i < edgeInputDescriptions.size(); i++) {
+        // Create a copy of the Configuration in order not to mess up the
+        // original one
+        Configuration confCopy = new Configuration(conf);
+        final EdgeInputFormatDescription edgeInputDescription =
+            edgeInputDescriptions.get(i);
+        GiraphConstants.EDGE_INPUT_FORMAT_CLASS.set(confCopy,
+            edgeInputDescription.getInputFormatClass());
+        edgeInputDescription.putParametersToConfiguration(confCopy);
+        // Create EdgeInputFormat in order to initialize its description with
+        // data from metastore, and check it
+        createGiraphConf(confCopy, edgeInputDescription)
+            .createWrappedEdgeInputFormat().checkInputSpecs(confCopy);
+      }
       GiraphConstants.EDGE_INPUT_FORMAT_CLASS.set(conf,
           MultiEdgeInputFormat.class);
       EdgeInputFormatDescription.EDGE_INPUT_FORMAT_DESCRIPTIONS.set(conf,
@@ -310,11 +410,18 @@ public class HiveGiraphRunner implements Tool {
   }
 
   /**
-   * Prepare output settings in Configuration
+   * Prepare output settings in Configuration.
+   *
+   * This caches metadata information into the configuration to eliminate 
worker
+   * access to the metastore.
    */
   public void prepareHiveOutput() {
     GiraphConstants.VERTEX_OUTPUT_FORMAT_CLASS.set(conf,
         HiveVertexOutputFormat.class);
+    // Output format will be checked by Hadoop, here we only create it in
+    // order to initialize the Configuration with data from metastore.
+    // Can't check it here since we don't have JobContext yet
+    createGiraphConf(conf).createWrappedVertexOutputFormat();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/giraph/blob/909d4c3c/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveUtils.java
----------------------------------------------------------------------
diff --git 
a/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveUtils.java 
b/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveUtils.java
index b809413..2388673 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveUtils.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveUtils.java
@@ -29,10 +29,6 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.log4j.Logger;
 
-import com.facebook.hiveio.input.HiveApiInputFormat;
-import com.facebook.hiveio.input.HiveInputDescription;
-import com.facebook.hiveio.output.HiveApiOutputFormat;
-import com.facebook.hiveio.output.HiveOutputDescription;
 import com.facebook.hiveio.schema.HiveTableSchema;
 import com.facebook.hiveio.schema.HiveTableSchemas;
 import com.google.common.base.Splitter;
@@ -65,51 +61,6 @@ public class HiveUtils {
   }
 
   /**
-   * Initialize hive input, prepare Configuration parameters
-   *
-   * @param hiveInputFormat HiveApiInputFormat
-   * @param inputDescription HiveInputDescription
-   * @param profileId profile ID
-   * @param conf Configuration
-   */
-  public static void initializeHiveInput(HiveApiInputFormat hiveInputFormat,
-      HiveInputDescription inputDescription, String profileId,
-      Configuration conf) {
-    try {
-      hiveInputFormat.setMyProfileId(profileId);
-      HiveApiInputFormat.setProfileInputDesc(conf, inputDescription, 
profileId);
-      HiveTableSchema schema = HiveTableSchemas.lookup(
-          conf, inputDescription.getTableDesc());
-      HiveTableSchemas.put(conf, profileId, schema);
-    } catch (IOException e) {
-      throw new IllegalStateException(
-          "initializeHiveInput: IOException occurred", e);
-    }
-  }
-
-  /**
-   * Initialize hive output, prepare Configuration parameters
-   *
-   * @param hiveOutputFormat HiveApiOutputFormat
-   * @param outputDesc HiveOutputDescription
-   * @param profileId Profile id
-   * @param conf Configuration
-   */
-  public static void initializeHiveOutput(HiveApiOutputFormat hiveOutputFormat,
-      HiveOutputDescription outputDesc, String profileId, Configuration conf) {
-    try {
-      hiveOutputFormat.setMyProfileId(profileId);
-      HiveApiOutputFormat.initProfile(conf, outputDesc, profileId);
-      HiveTableSchema schema = HiveTableSchemas.lookup(
-          conf, outputDesc.getTableDesc());
-      HiveTableSchemas.put(conf, profileId, schema);
-    } catch (IOException e) {
-      throw new IllegalStateException(
-          "initializeHiveOutput: IOException occurred", e);
-    }
-  }
-
-  /**
    * @param outputTablePartitionString table partition string
    * @return Map
    */

http://git-wip-us.apache.org/repos/asf/giraph/blob/909d4c3c/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeInputFormat.java
----------------------------------------------------------------------
diff --git 
a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeInputFormat.java
 
b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeInputFormat.java
index 534a773..6ba2aec 100644
--- 
a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeInputFormat.java
+++ 
b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeInputFormat.java
@@ -40,7 +40,6 @@ import com.facebook.hiveio.schema.HiveTableSchema;
 import java.io.IOException;
 import java.util.List;
 
-
 /**
  * {@link EdgeInputFormat} for reading edges from Hive.
  *
@@ -71,8 +70,7 @@ public class HiveEdgeInputFormat<I extends WritableComparable,
   public void setConf(
       ImmutableClassesGiraphConfiguration<I, Writable, E> conf) {
     super.setConf(conf);
-    HiveUtils.initializeHiveInput(
-        hiveInputFormat,
+    hiveInputFormat.initialize(
         GiraphHiveConstants.HIVE_EDGE_INPUT.makeInputDescription(conf),
         GiraphHiveConstants.HIVE_EDGE_INPUT.getProfileID(conf),
         conf);

http://git-wip-us.apache.org/repos/asf/giraph/blob/909d4c3c/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexInputFormat.java
----------------------------------------------------------------------
diff --git 
a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexInputFormat.java
 
b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexInputFormat.java
index d5c1279..499d839 100644
--- 
a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexInputFormat.java
+++ 
b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexInputFormat.java
@@ -20,7 +20,6 @@ package org.apache.giraph.hive.input.vertex;
 
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.hive.common.GiraphHiveConstants;
-import org.apache.giraph.hive.common.HiveUtils;
 import org.apache.giraph.io.VertexInputFormat;
 import org.apache.giraph.io.VertexReader;
 import org.apache.giraph.io.iterables.VertexReaderWrapper;
@@ -72,11 +71,9 @@ public class HiveVertexInputFormat<I extends 
WritableComparable,
   }
 
   @Override
-  public void setConf(
-      ImmutableClassesGiraphConfiguration<I, V, E> conf) {
+  public void setConf(ImmutableClassesGiraphConfiguration<I, V, E> conf) {
     super.setConf(conf);
-    HiveUtils.initializeHiveInput(
-        hiveInputFormat,
+    hiveInputFormat.initialize(
         GiraphHiveConstants.HIVE_VERTEX_INPUT.makeInputDescription(conf),
         GiraphHiveConstants.HIVE_VERTEX_INPUT.getProfileID(conf),
         conf);

http://git-wip-us.apache.org/repos/asf/giraph/blob/909d4c3c/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveVertexOutputFormat.java
----------------------------------------------------------------------
diff --git 
a/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveVertexOutputFormat.java
 
b/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveVertexOutputFormat.java
index c4813fb..97f61bd 100644
--- 
a/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveVertexOutputFormat.java
+++ 
b/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveVertexOutputFormat.java
@@ -19,7 +19,6 @@
 package org.apache.giraph.hive.output;
 
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.hive.common.HiveUtils;
 import org.apache.giraph.io.VertexOutputFormat;
 import org.apache.giraph.io.VertexWriter;
 import org.apache.hadoop.io.Writable;
@@ -84,11 +83,8 @@ public class HiveVertexOutputFormat<I extends 
WritableComparable,
   public void setConf(
       ImmutableClassesGiraphConfiguration<I, V, E> conf) {
     super.setConf(conf);
-    HiveUtils.initializeHiveOutput(
-        hiveOutputFormat,
-        makeOutputDesc(),
-        HIVE_VERTEX_OUTPUT_PROFILE_ID.get(conf),
-        conf);
+    hiveOutputFormat.initialize(makeOutputDesc(),
+        HIVE_VERTEX_OUTPUT_PROFILE_ID.get(conf), conf);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/giraph/blob/909d4c3c/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index f2981ff..43804f6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -272,7 +272,7 @@ under the License.
     <dep.guava.version>12.0</dep.guava.version>
     <dep.hcatalog.version>0.5.0-incubating</dep.hcatalog.version>
     <dep.hive.version>0.11.0</dep.hive.version>
-    <dep.hiveio.version>0.19</dep.hiveio.version>
+    <dep.hiveio.version>0.20</dep.hiveio.version>
     <dep.json.version>20090211</dep.json.version>
     <dep.junit.version>4.8</dep.junit.version>
     <dep.jython.version>2.5.3</dep.jython.version>

Reply via email to