Updated Branches:
  refs/heads/trunk 2a6c9d563 -> a68f2bada

GIRAPH-593: Update Hive IO performance improvements (nitay)


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/a68f2bad
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/a68f2bad
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/a68f2bad

Branch: refs/heads/trunk
Commit: a68f2badaa216125106bcc1e9397edf43623a9ec
Parents: 2a6c9d5
Author: Nitay Joffe <[email protected]>
Authored: Thu Mar 28 15:37:14 2013 -0400
Committer: Nitay Joffe <[email protected]>
Committed: Thu Mar 28 15:37:14 2013 -0400

----------------------------------------------------------------------
 CHANGELOG                                          |    2 +
 .../org/apache/giraph/hive/HiveGiraphRunner.java   |   39 ++++++++----
 .../DefaultConfigurableAndTableSchemaAware.java    |    4 +-
 .../giraph/hive/common/GiraphHiveConstants.java    |   47 +++++++++++++++
 .../apache/giraph/hive/common/HiveProfiles.java    |    3 -
 .../hive/input/edge/HiveEdgeInputFormat.java       |    4 +-
 .../giraph/hive/input/edge/HiveEdgeReader.java     |   26 ++++----
 .../apache/giraph/hive/input/edge/HiveToEdge.java  |    4 +-
 .../giraph/hive/input/edge/SimpleHiveToEdge.java   |   17 ++----
 .../giraph/hive/input/vertex/HiveToVertex.java     |    4 +-
 .../hive/input/vertex/HiveVertexInputFormat.java   |    5 +-
 .../giraph/hive/input/vertex/HiveVertexReader.java |   32 +++++-----
 .../hive/input/vertex/SimpleHiveToVertex.java      |    9 +--
 .../input/vertex/SimpleNoEdgesHiveToVertex.java    |    2 +-
 .../apache/giraph/hive/output/HiveRecordSaver.java |    4 +-
 .../giraph/hive/output/HiveVertexOutputFormat.java |    4 +-
 .../giraph/hive/output/HiveVertexWriter.java       |   24 ++++----
 .../giraph/hive/output/SimpleVertexToHive.java     |    4 +-
 .../apache/giraph/hive/output/VertexToHive.java    |    2 +-
 pom.xml                                            |    2 +-
 20 files changed, 146 insertions(+), 92 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/a68f2bad/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index ab59833..4e2036f 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
 Giraph Change Log
 
 Release 0.2.0 - unreleased
+  GIRAPH-593: Update Hive IO performance improvements (nitay)
+
   GIRAPH-594: auto set reusing objects (nitay)
 
   GIRAPH-597: Don't reuse vertex by default in SimpleHiveToVertex (majakabiljo)

http://git-wip-us.apache.org/repos/asf/giraph/blob/a68f2bad/giraph-hive/src/main/java/org/apache/giraph/hive/HiveGiraphRunner.java
----------------------------------------------------------------------
diff --git 
a/giraph-hive/src/main/java/org/apache/giraph/hive/HiveGiraphRunner.java 
b/giraph-hive/src/main/java/org/apache/giraph/hive/HiveGiraphRunner.java
index 63e9f95..712134e 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/HiveGiraphRunner.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/HiveGiraphRunner.java
@@ -27,13 +27,10 @@ import org.apache.commons.cli.ParseException;
 import org.apache.giraph.conf.GiraphClasses;
 import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.hive.common.HiveProfiles;
 import org.apache.giraph.hive.input.edge.HiveEdgeInputFormat;
-import org.apache.giraph.hive.input.edge.HiveEdgeReader;
 import org.apache.giraph.hive.input.edge.HiveToEdge;
 import org.apache.giraph.hive.input.vertex.HiveToVertex;
 import org.apache.giraph.hive.input.vertex.HiveVertexInputFormat;
-import org.apache.giraph.hive.input.vertex.HiveVertexReader;
 import org.apache.giraph.hive.output.HiveVertexOutputFormat;
 import org.apache.giraph.hive.output.HiveVertexWriter;
 import org.apache.giraph.hive.output.VertexToHive;
@@ -49,6 +46,7 @@ import com.facebook.giraph.hive.input.HiveApiInputFormat;
 import com.facebook.giraph.hive.input.HiveInputDescription;
 import com.facebook.giraph.hive.output.HiveApiOutputFormat;
 import com.facebook.giraph.hive.output.HiveOutputDescription;
+import com.facebook.giraph.hive.schema.HiveTableSchemas;
 import com.google.common.base.Splitter;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -59,6 +57,14 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 
+import static 
org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_EDGE_SPLITS;
+import static 
org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_TO_EDGE_CLASS;
+import static 
org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_TO_VERTEX_CLASS;
+import static 
org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_VERTEX_SPLITS;
+import static org.apache.giraph.hive.common.HiveProfiles.EDGE_INPUT_PROFILE_ID;
+import static 
org.apache.giraph.hive.common.HiveProfiles.VERTEX_INPUT_PROFILE_ID;
+import static 
org.apache.giraph.hive.common.HiveProfiles.VERTEX_OUTPUT_PROFILE_ID;
+
 /**
  * Hive Giraph Runner
  */
@@ -136,8 +142,7 @@ public class HiveGiraphRunner implements Tool {
   public void setHiveToVertexClass(
       Class<? extends HiveToVertex> hiveToVertexClass) {
     this.hiveToVertexClass = hiveToVertexClass;
-    conf.setClass(HiveVertexReader.HIVE_TO_VERTEX_KEY, hiveToVertexClass,
-        HiveToVertex.class);
+    HIVE_TO_VERTEX_CLASS.set(conf, hiveToVertexClass);
   }
 
   /**
@@ -169,8 +174,7 @@ public class HiveGiraphRunner implements Tool {
    */
   public void setHiveToEdgeClass(Class<? extends HiveToEdge> hiveToEdgeClass) {
     this.hiveToEdgeClass = hiveToEdgeClass;
-    conf.setClass(HiveEdgeReader.HIVE_TO_EDGE_KEY, hiveToEdgeClass,
-        HiveToEdge.class);
+    HIVE_TO_EDGE_CLASS.set(conf, hiveToEdgeClass);
   }
 
   public Class<? extends VertexToHive> getVertexToHiveClass() {
@@ -247,15 +251,21 @@ public class HiveGiraphRunner implements Tool {
    */
   private void setupHiveInputs(GiraphConfiguration conf) throws TException {
     if (hiveToVertexClass != null) {
-      HiveApiInputFormat.initProfile(conf, hiveVertexInputDescription,
-          HiveProfiles.VERTEX_INPUT_PROFILE_ID);
+      hiveVertexInputDescription.setNumSplits(HIVE_VERTEX_SPLITS.get(conf));
+      HiveApiInputFormat.setProfileInputDesc(conf, hiveVertexInputDescription,
+          VERTEX_INPUT_PROFILE_ID);
       conf.setVertexInputFormatClass(HiveVertexInputFormat.class);
+      HiveTableSchemas.put(conf, VERTEX_INPUT_PROFILE_ID,
+          hiveVertexInputDescription.hiveTableName());
     }
 
     if (hiveToEdgeClass != null) {
-      HiveApiInputFormat.initProfile(conf, hiveEdgeInputDescription,
-          HiveProfiles.EDGE_INPUT_PROFILE_ID);
+      hiveEdgeInputDescription.setNumSplits(HIVE_EDGE_SPLITS.get(conf));
+      HiveApiInputFormat.setProfileInputDesc(conf, hiveEdgeInputDescription,
+          EDGE_INPUT_PROFILE_ID);
       conf.setEdgeInputFormatClass(HiveEdgeInputFormat.class);
+      HiveTableSchemas.put(conf, EDGE_INPUT_PROFILE_ID,
+          hiveEdgeInputDescription.hiveTableName());
     }
   }
 
@@ -270,8 +280,10 @@ public class HiveGiraphRunner implements Tool {
       LOG.warn("run: Warning - Output will be skipped!");
     } else if (vertexToHiveClass != null) {
       HiveApiOutputFormat.initProfile(conf, hiveOutputDescription,
-          HiveProfiles.VERTEX_OUTPUT_PROFILE_ID);
+          VERTEX_OUTPUT_PROFILE_ID);
       conf.setVertexOutputFormatClass(HiveVertexOutputFormat.class);
+      HiveTableSchemas.put(conf, VERTEX_OUTPUT_PROFILE_ID,
+          hiveOutputDescription.hiveTableName());
     } else {
       LOG.fatal("output requested but " + VertexToHive.class.getSimpleName() +
           " not set");
@@ -336,8 +348,7 @@ public class HiveGiraphRunner implements Tool {
 
     String hiveToVertexClassStr = cmdln.getOptionValue("hiveToVertexClass");
     if (hiveToVertexClassStr != null) {
-      setHiveToVertexClass(findClass(hiveToVertexClassStr,
-          HiveToVertex.class));
+      setHiveToVertexClass(findClass(hiveToVertexClassStr, 
HiveToVertex.class));
     }
 
     String hiveToEdgeClassStr = cmdln.getOptionValue("hiveToEdgeClass");

http://git-wip-us.apache.org/repos/asf/giraph/blob/a68f2bad/giraph-hive/src/main/java/org/apache/giraph/hive/common/DefaultConfigurableAndTableSchemaAware.java
----------------------------------------------------------------------
diff --git 
a/giraph-hive/src/main/java/org/apache/giraph/hive/common/DefaultConfigurableAndTableSchemaAware.java
 
b/giraph-hive/src/main/java/org/apache/giraph/hive/common/DefaultConfigurableAndTableSchemaAware.java
index c8b201f..ae32b71 100644
--- 
a/giraph-hive/src/main/java/org/apache/giraph/hive/common/DefaultConfigurableAndTableSchemaAware.java
+++ 
b/giraph-hive/src/main/java/org/apache/giraph/hive/common/DefaultConfigurableAndTableSchemaAware.java
@@ -22,8 +22,8 @@ import 
org.apache.giraph.conf.DefaultImmutableClassesGiraphConfigurable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
-import com.facebook.giraph.hive.HiveTableSchema;
-import com.facebook.giraph.hive.HiveTableSchemaAware;
+import com.facebook.giraph.hive.schema.HiveTableSchema;
+import com.facebook.giraph.hive.schema.HiveTableSchemaAware;
 
 /**
  * Default implementation of {@link HiveTableSchemaAware} and

http://git-wip-us.apache.org/repos/asf/giraph/blob/a68f2bad/giraph-hive/src/main/java/org/apache/giraph/hive/common/GiraphHiveConstants.java
----------------------------------------------------------------------
diff --git 
a/giraph-hive/src/main/java/org/apache/giraph/hive/common/GiraphHiveConstants.java
 
b/giraph-hive/src/main/java/org/apache/giraph/hive/common/GiraphHiveConstants.java
new file mode 100644
index 0000000..f8363b1
--- /dev/null
+++ 
b/giraph-hive/src/main/java/org/apache/giraph/hive/common/GiraphHiveConstants.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.hive.common;
+
+import org.apache.giraph.conf.ClassConfOption;
+import org.apache.giraph.conf.IntConfOption;
+import org.apache.giraph.hive.input.edge.HiveToEdge;
+import org.apache.giraph.hive.input.vertex.HiveToVertex;
+
+/**
+ * Constants for giraph-hive
+ */
+public class GiraphHiveConstants {
+  /** Number of edge splits */
+  public static final IntConfOption HIVE_EDGE_SPLITS =
+      new IntConfOption("giraph.hive.input.edge.splits", 0);
+  /** Number of vertex splits */
+  public static final IntConfOption HIVE_VERTEX_SPLITS =
+      new IntConfOption("giraph.hive.input.vertex.splits", 0);
+  /** Class for converting hive records to edges */
+  public static final ClassConfOption<HiveToEdge> HIVE_TO_EDGE_CLASS =
+      ClassConfOption.create("giraph.hive.to.edge.class", null,
+          HiveToEdge.class);
+  /** Class for converting hive records to vertices */
+  public static final ClassConfOption<HiveToVertex> HIVE_TO_VERTEX_CLASS =
+      ClassConfOption.create("giraph.hive.to.vertex.class", null,
+          HiveToVertex.class);
+
+  /** Don't construct */
+  protected GiraphHiveConstants() { }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/a68f2bad/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveProfiles.java
----------------------------------------------------------------------
diff --git 
a/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveProfiles.java 
b/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveProfiles.java
index b0ddc48..892d443 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveProfiles.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/common/HiveProfiles.java
@@ -26,9 +26,6 @@ public class HiveProfiles {
   public static final String VERTEX_INPUT_PROFILE_ID = "vertex_input_profile";
   /** name of edge input profile */
   public static final String EDGE_INPUT_PROFILE_ID = "edge_input_profile";
-  /** name of vertex value input profile */
-  public static final String VERTEX_VALUE_INPUT_PROFILE_ID =
-      "vertex_value_input_profile";
   /** Name of vertex output profile */
   public static final String VERTEX_OUTPUT_PROFILE_ID = 
"vertex_output_profile";
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/a68f2bad/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeInputFormat.java
----------------------------------------------------------------------
diff --git 
a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeInputFormat.java
 
b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeInputFormat.java
index 68edbfc..041e331 100644
--- 
a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeInputFormat.java
+++ 
b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeInputFormat.java
@@ -30,8 +30,8 @@ import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
-import com.facebook.giraph.hive.HiveRecord;
 import com.facebook.giraph.hive.input.HiveApiInputFormat;
+import com.facebook.giraph.hive.record.HiveReadableRecord;
 
 import java.io.IOException;
 import java.util.List;
@@ -70,7 +70,7 @@ public class HiveEdgeInputFormat<I extends WritableComparable,
     HiveEdgeReader<I, E> reader = new HiveEdgeReader<I, E>();
     reader.setTableSchema(hiveInputFormat.getTableSchema(conf));
 
-    RecordReader<WritableComparable, HiveRecord> baseReader;
+    RecordReader<WritableComparable, HiveReadableRecord> baseReader;
     try {
       baseReader = hiveInputFormat.createRecordReader(split, context);
     } catch (InterruptedException e) {

http://git-wip-us.apache.org/repos/asf/giraph/blob/a68f2bad/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeReader.java
----------------------------------------------------------------------
diff --git 
a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeReader.java
 
b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeReader.java
index 275f8f7..e1a69cf 100644
--- 
a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeReader.java
+++ 
b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveEdgeReader.java
@@ -19,9 +19,9 @@
 package org.apache.giraph.hive.input.edge;
 
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.hive.input.RecordReaderWrapper;
 import org.apache.giraph.io.iterables.EdgeWithSource;
 import org.apache.giraph.io.iterables.GiraphReader;
-import org.apache.giraph.hive.input.RecordReaderWrapper;
 import org.apache.giraph.utils.ReflectionUtils;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
@@ -29,13 +29,15 @@ import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
-import com.facebook.giraph.hive.HiveRecord;
-import com.facebook.giraph.hive.HiveTableSchema;
-import com.facebook.giraph.hive.HiveTableSchemaAware;
-import com.facebook.giraph.hive.HiveTableSchemas;
+import com.facebook.giraph.hive.record.HiveReadableRecord;
+import com.facebook.giraph.hive.schema.HiveTableSchema;
+import com.facebook.giraph.hive.schema.HiveTableSchemaAware;
+import com.facebook.giraph.hive.schema.HiveTableSchemas;
 
 import java.io.IOException;
 
+import static 
org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_TO_EDGE_CLASS;
+
 /**
  * A reader for reading edges from Hive.
  *
@@ -48,7 +50,7 @@ public class HiveEdgeReader<I extends WritableComparable, E 
extends Writable>
   public static final String HIVE_TO_EDGE_KEY = "giraph.hive.to.edge.class";
 
   /** Underlying Hive RecordReader used */
-  private RecordReader<WritableComparable, HiveRecord> hiveRecordReader;
+  private RecordReader<WritableComparable, HiveReadableRecord> 
hiveRecordReader;
   /** Schema for table in Hive */
   private HiveTableSchema tableSchema;
 
@@ -63,7 +65,8 @@ public class HiveEdgeReader<I extends WritableComparable, E 
extends Writable>
    *
    * @return RecordReader from Hive
    */
-  public RecordReader<WritableComparable, HiveRecord> getHiveRecordReader() {
+  public RecordReader<WritableComparable, HiveReadableRecord>
+  getHiveRecordReader() {
     return hiveRecordReader;
   }
 
@@ -73,7 +76,7 @@ public class HiveEdgeReader<I extends WritableComparable, E 
extends Writable>
    * @param hiveRecordReader RecordReader to read from Hive.
    */
   public void setHiveRecordReader(
-      RecordReader<WritableComparable, HiveRecord> hiveRecordReader) {
+      RecordReader<WritableComparable, HiveReadableRecord> hiveRecordReader) {
     this.hiveRecordReader = hiveRecordReader;
   }
 
@@ -103,7 +106,7 @@ public class HiveEdgeReader<I extends WritableComparable, E 
extends Writable>
     conf = new ImmutableClassesGiraphConfiguration(context.getConfiguration());
     instantiateHiveToEdgeFromConf();
     hiveToEdge.initializeRecords(
-        new RecordReaderWrapper<HiveRecord>(hiveRecordReader));
+        new RecordReaderWrapper<HiveReadableRecord>(hiveRecordReader));
   }
 
   /**
@@ -112,10 +115,9 @@ public class HiveEdgeReader<I extends WritableComparable, 
E extends Writable>
    * @throws IOException if anything goes wrong reading from Configuration
    */
   private void instantiateHiveToEdgeFromConf() throws IOException {
-    Class<? extends HiveToEdge> klass = conf.getClass(HIVE_TO_EDGE_KEY,
-        null, HiveToEdge.class);
+    Class<? extends HiveToEdge> klass = HIVE_TO_EDGE_CLASS.get(conf);
     if (klass == null) {
-      throw new IOException(HIVE_TO_EDGE_KEY + " not set in conf");
+      throw new IOException(HIVE_TO_EDGE_CLASS.getKey() + " not set in conf");
     }
     hiveToEdge = ReflectionUtils.newInstance(klass, conf);
     HiveTableSchemas.configure(hiveToEdge, tableSchema);

http://git-wip-us.apache.org/repos/asf/giraph/blob/a68f2bad/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveToEdge.java
----------------------------------------------------------------------
diff --git 
a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveToEdge.java 
b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveToEdge.java
index 8b22a8f..515f1da 100644
--- 
a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveToEdge.java
+++ 
b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/HiveToEdge.java
@@ -22,7 +22,7 @@ import org.apache.giraph.io.iterables.EdgeWithSource;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
-import com.facebook.giraph.hive.HiveRecord;
+import com.facebook.giraph.hive.record.HiveReadableRecord;
 
 import java.util.Iterator;
 
@@ -43,5 +43,5 @@ public interface HiveToEdge<I extends WritableComparable,
    *
    * @param records Hive records
    */
-  void initializeRecords(Iterator<HiveRecord> records);
+  void initializeRecords(Iterator<HiveReadableRecord> records);
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/a68f2bad/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/SimpleHiveToEdge.java
----------------------------------------------------------------------
diff --git 
a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/SimpleHiveToEdge.java
 
b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/SimpleHiveToEdge.java
index 7aa8721..94a811e 100644
--- 
a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/SimpleHiveToEdge.java
+++ 
b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/SimpleHiveToEdge.java
@@ -18,13 +18,11 @@
 
 package org.apache.giraph.hive.input.edge;
 
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.io.iterables.EdgeWithSource;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
-import com.facebook.giraph.hive.HiveReadableRecord;
-import com.facebook.giraph.hive.HiveRecord;
+import com.facebook.giraph.hive.record.HiveReadableRecord;
 
 import java.util.Iterator;
 
@@ -41,7 +39,7 @@ public abstract class SimpleHiveToEdge<I extends 
WritableComparable,
     V extends Writable, E extends Writable, M extends Writable>
     extends AbstractHiveToEdge<I, V, E, M> {
   /** Iterator over input records */
-  private Iterator<HiveRecord> records;
+  private Iterator<HiveReadableRecord> records;
   /** Reusable {@link EdgeWithSource} object */
   private EdgeWithSource<I, E> reusableEdge = new EdgeWithSource<I, E>();
 
@@ -70,14 +68,9 @@ public abstract class SimpleHiveToEdge<I extends 
WritableComparable,
   public abstract E getEdgeValue(HiveReadableRecord hiveRecord);
 
   @Override
-  public void setConf(ImmutableClassesGiraphConfiguration<I, V, E, M> conf) {
-    super.setConf(conf);
-    reusableEdge.setEdge(getConf().createReusableEdge());
-  }
-
-  @Override
-  public final void initializeRecords(Iterator<HiveRecord> records) {
+  public final void initializeRecords(Iterator<HiveReadableRecord> records) {
     this.records = records;
+    reusableEdge.setEdge(getConf().createReusableEdge());
   }
 
   @Override
@@ -87,7 +80,7 @@ public abstract class SimpleHiveToEdge<I extends 
WritableComparable,
 
   @Override
   public EdgeWithSource<I, E> next() {
-    HiveRecord record = records.next();
+    HiveReadableRecord record = records.next();
     reusableEdge.setSourceVertexId(getSourceVertexId(record));
     reusableEdge.setTargetVertexId(getTargetVertexId(record));
     reusableEdge.setEdgeValue(getEdgeValue(record));

http://git-wip-us.apache.org/repos/asf/giraph/blob/a68f2bad/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveToVertex.java
----------------------------------------------------------------------
diff --git 
a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveToVertex.java
 
b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveToVertex.java
index 1179961..f57dc4e 100644
--- 
a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveToVertex.java
+++ 
b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveToVertex.java
@@ -22,7 +22,7 @@ import org.apache.giraph.graph.Vertex;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
-import com.facebook.giraph.hive.HiveRecord;
+import com.facebook.giraph.hive.record.HiveReadableRecord;
 
 import java.util.Iterator;
 
@@ -46,5 +46,5 @@ public interface HiveToVertex<I extends WritableComparable,
    *
    * @param records Hive records
    */
-  void initializeRecords(Iterator<HiveRecord> records);
+  void initializeRecords(Iterator<HiveReadableRecord> records);
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/a68f2bad/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexInputFormat.java
----------------------------------------------------------------------
diff --git 
a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexInputFormat.java
 
b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexInputFormat.java
index 4f70750..3b25444 100644
--- 
a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexInputFormat.java
+++ 
b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexInputFormat.java
@@ -27,10 +27,11 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
-import com.facebook.giraph.hive.impl.input.HiveApiRecordReader;
 import com.facebook.giraph.hive.input.HiveApiInputFormat;
+import com.facebook.giraph.hive.record.HiveReadableRecord;
 
 import java.io.IOException;
 import java.util.List;
@@ -71,7 +72,7 @@ public class HiveVertexInputFormat<I extends 
WritableComparable,
     HiveVertexReader<I, V, E, M> reader = new HiveVertexReader<I, V, E, M>();
     reader.setTableSchema(hiveInputFormat.getTableSchema(conf));
 
-    HiveApiRecordReader baseReader;
+    RecordReader<WritableComparable, HiveReadableRecord> baseReader;
     try {
       baseReader = hiveInputFormat.createRecordReader(split, context);
     } catch (InterruptedException e) {

http://git-wip-us.apache.org/repos/asf/giraph/blob/a68f2bad/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexReader.java
----------------------------------------------------------------------
diff --git 
a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexReader.java
 
b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexReader.java
index 442c796..9c172be 100644
--- 
a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexReader.java
+++ 
b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/HiveVertexReader.java
@@ -26,16 +26,18 @@ import org.apache.giraph.utils.ReflectionUtils;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
-import com.facebook.giraph.hive.HiveRecord;
-import com.facebook.giraph.hive.HiveTableSchema;
-import com.facebook.giraph.hive.HiveTableSchemaAware;
-import com.facebook.giraph.hive.HiveTableSchemas;
-import com.facebook.giraph.hive.impl.input.HiveApiRecordReader;
+import com.facebook.giraph.hive.record.HiveReadableRecord;
+import com.facebook.giraph.hive.schema.HiveTableSchema;
+import com.facebook.giraph.hive.schema.HiveTableSchemaAware;
+import com.facebook.giraph.hive.schema.HiveTableSchemas;
 
 import java.io.IOException;
 
+import static 
org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_TO_VERTEX_CLASS;
+
 /**
  * VertexReader using Hive
  *
@@ -47,11 +49,8 @@ import java.io.IOException;
 public class HiveVertexReader<I extends WritableComparable,
     V extends Writable, E extends Writable, M extends Writable>
     implements GiraphReader<Vertex<I, V, E, M>>, HiveTableSchemaAware {
-  /** Configuration key for {@link HiveToVertex} class */
-  public static final String HIVE_TO_VERTEX_KEY =
-      "giraph.hive.to.vertex.class";
   /** Underlying Hive RecordReader used */
-  private HiveApiRecordReader hiveRecordReader;
+  private RecordReader<WritableComparable, HiveReadableRecord> 
hiveRecordReader;
   /** Schema for table in Hive */
   private HiveTableSchema tableSchema;
 
@@ -69,7 +68,8 @@ public class HiveVertexReader<I extends WritableComparable,
    *
    * @return RecordReader from Hive.
    */
-  public HiveApiRecordReader getHiveRecordReader() {
+  public RecordReader<WritableComparable, HiveReadableRecord>
+  getHiveRecordReader() {
     return hiveRecordReader;
   }
 
@@ -78,7 +78,8 @@ public class HiveVertexReader<I extends WritableComparable,
    *
    * @param hiveRecordReader RecordReader to read from Hive.
    */
-  public void setHiveRecordReader(HiveApiRecordReader hiveRecordReader) {
+  public void setHiveRecordReader(
+      RecordReader<WritableComparable, HiveReadableRecord> hiveRecordReader) {
     this.hiveRecordReader = hiveRecordReader;
   }
 
@@ -105,14 +106,13 @@ public class HiveVertexReader<I extends 
WritableComparable,
   public void initialize(InputSplit inputSplit,
       TaskAttemptContext context) throws IOException, InterruptedException {
     hiveRecordReader.initialize(inputSplit, context);
-    conf = new ImmutableClassesGiraphConfiguration<I, V, E,
-        M>(context.getConfiguration());
-    Class<? extends HiveToVertex> klass = conf.getClass(HIVE_TO_VERTEX_KEY,
-        SimpleHiveToVertex.class, HiveToVertex.class);
+    conf = new ImmutableClassesGiraphConfiguration<I, V, E, M>(
+        context.getConfiguration());
+    Class<? extends HiveToVertex> klass = HIVE_TO_VERTEX_CLASS.get(conf);
     hiveToVertex = ReflectionUtils.newInstance(klass, conf);
     HiveTableSchemas.configure(hiveToVertex, tableSchema);
     hiveToVertex.initializeRecords(
-        new RecordReaderWrapper<HiveRecord>(hiveRecordReader));
+        new RecordReaderWrapper<HiveReadableRecord>(hiveRecordReader));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/giraph/blob/a68f2bad/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/SimpleHiveToVertex.java
----------------------------------------------------------------------
diff --git 
a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/SimpleHiveToVertex.java
 
b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/SimpleHiveToVertex.java
index a4acd2f..59c10be 100644
--- 
a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/SimpleHiveToVertex.java
+++ 
b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/SimpleHiveToVertex.java
@@ -24,8 +24,7 @@ import org.apache.giraph.graph.Vertex;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
-import com.facebook.giraph.hive.HiveReadableRecord;
-import com.facebook.giraph.hive.HiveRecord;
+import com.facebook.giraph.hive.record.HiveReadableRecord;
 
 import java.util.Iterator;
 
@@ -42,7 +41,7 @@ public abstract class SimpleHiveToVertex<I extends 
WritableComparable,
     V extends Writable, E extends Writable, M extends Writable>
     extends AbstractHiveToVertex<I, V, E, M> {
   /** Hive records which we are reading from */
-  private Iterator<HiveRecord> records;
+  private Iterator<HiveReadableRecord> records;
 
   /** Reusable vertex object */
   private Vertex<I, V, E, M> reusableVertex = null;
@@ -80,7 +79,7 @@ public abstract class SimpleHiveToVertex<I extends 
WritableComparable,
   }
 
   @Override
-  public void initializeRecords(Iterator<HiveRecord> records) {
+  public void initializeRecords(Iterator<HiveReadableRecord> records) {
     this.records = records;
   }
 
@@ -91,7 +90,7 @@ public abstract class SimpleHiveToVertex<I extends 
WritableComparable,
 
   @Override
   public Vertex<I, V, E, M> next() {
-    HiveRecord record = records.next();
+    HiveReadableRecord record = records.next();
     I id = getVertexId(record);
     V value = getVertexValue(record);
     Iterable<Edge<I, E>> edges = getEdges(record);

http://git-wip-us.apache.org/repos/asf/giraph/blob/a68f2bad/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/SimpleNoEdgesHiveToVertex.java
----------------------------------------------------------------------
diff --git 
a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/SimpleNoEdgesHiveToVertex.java
 
b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/SimpleNoEdgesHiveToVertex.java
index 195a69e..ca427a0 100644
--- 
a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/SimpleNoEdgesHiveToVertex.java
+++ 
b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/SimpleNoEdgesHiveToVertex.java
@@ -22,7 +22,7 @@ import org.apache.giraph.edge.Edge;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
-import com.facebook.giraph.hive.HiveReadableRecord;
+import com.facebook.giraph.hive.record.HiveReadableRecord;
 import com.google.common.collect.ImmutableList;
 
 /**

http://git-wip-us.apache.org/repos/asf/giraph/blob/a68f2bad/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveRecordSaver.java
----------------------------------------------------------------------
diff --git 
a/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveRecordSaver.java 
b/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveRecordSaver.java
index 70de517..a0a4207 100644
--- 
a/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveRecordSaver.java
+++ 
b/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveRecordSaver.java
@@ -18,7 +18,7 @@
 
 package org.apache.giraph.hive.output;
 
-import com.facebook.giraph.hive.HiveRecord;
+import com.facebook.giraph.hive.record.HiveWritableRecord;
 
 import java.io.IOException;
 
@@ -33,5 +33,5 @@ public interface HiveRecordSaver {
    * @throws IOException
    * @throws InterruptedException
    */
-  void save(HiveRecord record) throws IOException, InterruptedException;
+  void save(HiveWritableRecord record) throws IOException, 
InterruptedException;
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/a68f2bad/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveVertexOutputFormat.java
----------------------------------------------------------------------
diff --git 
a/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveVertexOutputFormat.java
 
b/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveVertexOutputFormat.java
index 641a298..9ff74d6 100644
--- 
a/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveVertexOutputFormat.java
+++ 
b/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveVertexOutputFormat.java
@@ -29,8 +29,8 @@ import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
-import com.facebook.giraph.hive.HiveRecord;
 import com.facebook.giraph.hive.output.HiveApiOutputFormat;
+import com.facebook.giraph.hive.record.HiveWritableRecord;
 
 import java.io.IOException;
 
@@ -60,7 +60,7 @@ public class HiveVertexOutputFormat<I extends 
WritableComparable,
     throws IOException, InterruptedException {
     Configuration conf = context.getConfiguration();
 
-    RecordWriter<WritableComparable, HiveRecord> baseWriter =
+    RecordWriter<WritableComparable, HiveWritableRecord> baseWriter =
         hiveOutputFormat.getRecordWriter(context);
     HiveVertexWriter writer = new HiveVertexWriter();
     writer.setBaseWriter(baseWriter);

http://git-wip-us.apache.org/repos/asf/giraph/blob/a68f2bad/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveVertexWriter.java
----------------------------------------------------------------------
diff --git 
a/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveVertexWriter.java 
b/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveVertexWriter.java
index 3eef1f4..45d0226 100644
--- 
a/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveVertexWriter.java
+++ 
b/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveVertexWriter.java
@@ -29,13 +29,14 @@ import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.log4j.Logger;
 
-import com.facebook.giraph.hive.HiveRecord;
-import com.facebook.giraph.hive.HiveTableSchema;
-import com.facebook.giraph.hive.HiveTableSchemas;
-import com.facebook.giraph.hive.impl.HiveApiRecord;
+import com.facebook.giraph.hive.input.parser.hive.DefaultRecord;
+import com.facebook.giraph.hive.record.HiveRecord;
+import com.facebook.giraph.hive.record.HiveWritableRecord;
+import com.facebook.giraph.hive.schema.HiveTableSchema;
+import com.facebook.giraph.hive.schema.HiveTableSchemas;
 
 import java.io.IOException;
-import java.util.Collections;
+
 
 /**
  * Vertex writer using Hive.
@@ -53,7 +54,7 @@ public class HiveVertexWriter<I extends WritableComparable, V 
extends Writable,
   private static final Logger LOG = Logger.getLogger(HiveVertexWriter.class);
 
   /** Underlying Hive RecordWriter used */
-  private RecordWriter<WritableComparable, HiveRecord>  hiveRecordWriter;
+  private RecordWriter<WritableComparable, HiveWritableRecord> 
hiveRecordWriter;
   /** Schema for table in Hive */
   private HiveTableSchema tableSchema;
 
@@ -71,7 +72,7 @@ public class HiveVertexWriter<I extends WritableComparable, V 
extends Writable,
    *
    * @return RecordWriter for Hive.
    */
-  public RecordWriter<WritableComparable, HiveRecord> getBaseWriter() {
+  public RecordWriter<WritableComparable, HiveWritableRecord> getBaseWriter() {
     return hiveRecordWriter;
   }
 
@@ -81,7 +82,7 @@ public class HiveVertexWriter<I extends WritableComparable, V 
extends Writable,
    * @param hiveRecordWriter RecordWriter to write to Hive.
    */
   public void setBaseWriter(
-      RecordWriter<WritableComparable, HiveRecord> hiveRecordWriter) {
+      RecordWriter<WritableComparable, HiveWritableRecord> hiveRecordWriter) {
     this.hiveRecordWriter = hiveRecordWriter;
   }
 
@@ -101,8 +102,8 @@ public class HiveVertexWriter<I extends WritableComparable, 
V extends Writable,
    */
   public void setTableSchema(HiveTableSchema tableSchema) {
     this.tableSchema = tableSchema;
-    reusableRecord = new HiveApiRecord(tableSchema.numColumns(),
-        Collections.<String>emptyList());
+    reusableRecord = new DefaultRecord(tableSchema.numColumns(),
+        new String[0]);
   }
 
   @Override
@@ -140,7 +141,8 @@ public class HiveVertexWriter<I extends WritableComparable, 
V extends Writable,
   }
 
   @Override
-  public void save(HiveRecord record) throws IOException, InterruptedException 
{
+  public void save(HiveWritableRecord record) throws IOException,
+      InterruptedException {
     hiveRecordWriter.write(NullWritable.get(), record);
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/a68f2bad/giraph-hive/src/main/java/org/apache/giraph/hive/output/SimpleVertexToHive.java
----------------------------------------------------------------------
diff --git 
a/giraph-hive/src/main/java/org/apache/giraph/hive/output/SimpleVertexToHive.java
 
b/giraph-hive/src/main/java/org/apache/giraph/hive/output/SimpleVertexToHive.java
index 3e76d87..61de791 100644
--- 
a/giraph-hive/src/main/java/org/apache/giraph/hive/output/SimpleVertexToHive.java
+++ 
b/giraph-hive/src/main/java/org/apache/giraph/hive/output/SimpleVertexToHive.java
@@ -22,8 +22,8 @@ import org.apache.giraph.graph.Vertex;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
-import com.facebook.giraph.hive.HiveRecord;
-import com.facebook.giraph.hive.HiveWritableRecord;
+import com.facebook.giraph.hive.record.HiveRecord;
+import com.facebook.giraph.hive.record.HiveWritableRecord;
 
 import java.io.IOException;
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/a68f2bad/giraph-hive/src/main/java/org/apache/giraph/hive/output/VertexToHive.java
----------------------------------------------------------------------
diff --git 
a/giraph-hive/src/main/java/org/apache/giraph/hive/output/VertexToHive.java 
b/giraph-hive/src/main/java/org/apache/giraph/hive/output/VertexToHive.java
index ff5869d..05e81ab 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/output/VertexToHive.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/output/VertexToHive.java
@@ -22,7 +22,7 @@ import org.apache.giraph.graph.Vertex;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
-import com.facebook.giraph.hive.HiveRecord;
+import com.facebook.giraph.hive.record.HiveRecord;
 
 import java.io.IOException;
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/a68f2bad/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 799bb0a..1e321b8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -881,7 +881,7 @@ under the License.
       <dependency>
         <groupId>com.facebook.giraph.hive</groupId>
         <artifactId>hive-io-experimental</artifactId>
-        <version>0.4</version>
+        <version>0.5</version>
       </dependency>
       <dependency>
         <groupId>com.google.guava</groupId>

Reply via email to