Updated Branches:
  refs/heads/trunk 49dbdf72e -> c2af20485

http://git-wip-us.apache.org/repos/asf/giraph/blob/c2af2048/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/TypedHiveToEdge.java
----------------------------------------------------------------------
diff --git 
a/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/TypedHiveToEdge.java
 
b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/TypedHiveToEdge.java
new file mode 100644
index 0000000..ac62827
--- /dev/null
+++ 
b/giraph-hive/src/main/java/org/apache/giraph/hive/input/edge/TypedHiveToEdge.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.hive.input.edge;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.conf.StrConfOption;
+import org.apache.giraph.hive.types.HiveValueReader;
+import org.apache.giraph.hive.types.HiveVertexIdReader;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import com.facebook.hiveio.input.HiveInputDescription;
+import com.facebook.hiveio.record.HiveReadableRecord;
+import com.facebook.hiveio.schema.HiveTableSchema;
+
+import java.util.Iterator;
+
+import static org.apache.giraph.hive.types.TypedValueReader.createValueReader;
+import static org.apache.giraph.hive.types.TypedVertexIdReader.createIdReader;
+
+/**
+ * A {@link HiveToEdge} using {@link org.apache.giraph.types.WritableWrapper}s
+ *
+ * @param <I> Vertex ID
+ * @param <E> Edge Value
+ */
+public class TypedHiveToEdge<I extends WritableComparable, E extends Writable>
+    extends SimpleHiveToEdge<I, E> {
+  /** Source ID column name in Hive */
+  public static final StrConfOption EDGE_SOURCE_ID_COLUMN =
+      new StrConfOption("hive.input.edge.source.id.column", null,
+          "Source Vertex ID column");
+  /** Target ID column name in Hive */
+  public static final StrConfOption EDGE_TARGET_ID_COLUMN =
+      new StrConfOption("hive.input.edge.target.id.column", null,
+          "Target Vertex ID column");
+  /** Edge Value column name in Hive */
+  public static final StrConfOption EDGE_VALUE_COLUMN =
+      new StrConfOption("hive.input.edge.value.column", null,
+          "Edge Value column");
+
+  /** Source ID reader */
+  private HiveVertexIdReader<I> sourceIdReader;
+  /** Target ID reader */
+  private HiveVertexIdReader<I> targetIdReader;
+  /** Edge Value reader */
+  private HiveValueReader<E> vertexValueReader;
+
+  @Override
+  public void checkInput(HiveInputDescription inputDesc,
+      HiveTableSchema schema) { }
+
+  @Override
+  public void initializeRecords(Iterator<HiveReadableRecord> records) {
+    super.initializeRecords(records);
+
+    HiveTableSchema schema = getTableSchema();
+    ImmutableClassesGiraphConfiguration conf = getConf();
+
+    sourceIdReader = createIdReader(conf, EDGE_SOURCE_ID_COLUMN, schema);
+    targetIdReader = createIdReader(conf, EDGE_TARGET_ID_COLUMN, schema);
+    vertexValueReader = createValueReader(conf, EDGE_VALUE_COLUMN, schema);
+  }
+
+  @Override
+  public E getEdgeValue(HiveReadableRecord record) {
+    return vertexValueReader.readValue(record);
+  }
+
+  @Override
+  public I getSourceVertexId(
+      HiveReadableRecord record) {
+    return sourceIdReader.readId(record);
+  }
+
+  @Override
+  public I getTargetVertexId(
+      HiveReadableRecord record) {
+    return targetIdReader.readId(record);
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/giraph/blob/c2af2048/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/TypedHiveToVertex.java
----------------------------------------------------------------------
diff --git 
a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/TypedHiveToVertex.java
 
b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/TypedHiveToVertex.java
new file mode 100644
index 0000000..5ca7d34
--- /dev/null
+++ 
b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/TypedHiveToVertex.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.hive.input.vertex;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.conf.StrConfOption;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.hive.types.HiveValueReader;
+import org.apache.giraph.hive.types.HiveVertexIdReader;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import com.facebook.hiveio.input.HiveInputDescription;
+import com.facebook.hiveio.record.HiveReadableRecord;
+import com.facebook.hiveio.schema.HiveTableSchema;
+import com.google.common.collect.ImmutableList;
+
+import java.util.Iterator;
+
+import static org.apache.giraph.hive.types.TypedValueReader.createValueReader;
+import static org.apache.giraph.hive.types.TypedVertexIdReader.createIdReader;
+
+/**
+ * A {@link HiveToVertex} using {@link org.apache.giraph.types.WritableWrapper}
+ *
+ * @param <I> Vertex ID
+ * @param <V> Vertex Value
+ * @param <E> Edge Value
+ */
+public class TypedHiveToVertex<I extends WritableComparable, V extends 
Writable,
+    E extends Writable> extends SimpleHiveToVertex<I, V, E> {
+  /** Source ID column name in Hive */
+  public static final StrConfOption VERTEX_ID_COLUMN =
+      new StrConfOption("hive.input.vertex.id.column", null,
+          "Vertex ID column");
+  /** Target ID column name in Hive */
+  public static final StrConfOption VERTEX_VALUE_COLUMN =
+      new StrConfOption("hive.input.vertex.value.column", null,
+          "Vertex Value column");
+  /** Edge Value column name in Hive */
+  public static final StrConfOption EDGES_COLUMN =
+      new StrConfOption("hive.input.vertex.edges.column", null,
+          "Edges column");
+
+  /** Vertex ID reader */
+  private HiveVertexIdReader<I> vertexIdReader;
+  /** Vertex Value reader */
+  private HiveValueReader<V> vertexValueReader;
+
+  @Override
+  public void checkInput(HiveInputDescription inputDesc,
+      HiveTableSchema schema) { }
+
+  @Override
+  public void initializeRecords(Iterator<HiveReadableRecord> records) {
+    super.initializeRecords(records);
+
+    HiveTableSchema schema = getTableSchema();
+    ImmutableClassesGiraphConfiguration conf = getConf();
+
+    vertexIdReader = createIdReader(conf, VERTEX_ID_COLUMN, schema);
+    vertexValueReader = createValueReader(conf, VERTEX_VALUE_COLUMN, schema);
+  }
+
+  @Override
+  public Iterable<Edge<I, E>> getEdges(HiveReadableRecord record) {
+    return ImmutableList.of();
+  }
+
+  @Override
+  public I getVertexId(HiveReadableRecord record) {
+    return vertexIdReader.readId(record);
+  }
+
+  @Override
+  public V getVertexValue(HiveReadableRecord record) {
+    return vertexValueReader.readValue(record);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/c2af2048/giraph-hive/src/main/java/org/apache/giraph/hive/output/AbstractVertexToHive.java
----------------------------------------------------------------------
diff --git 
a/giraph-hive/src/main/java/org/apache/giraph/hive/output/AbstractVertexToHive.java
 
b/giraph-hive/src/main/java/org/apache/giraph/hive/output/AbstractVertexToHive.java
index 477ce6e..4046bb2 100644
--- 
a/giraph-hive/src/main/java/org/apache/giraph/hive/output/AbstractVertexToHive.java
+++ 
b/giraph-hive/src/main/java/org/apache/giraph/hive/output/AbstractVertexToHive.java
@@ -31,4 +31,7 @@ import org.apache.hadoop.io.WritableComparable;
 public abstract class AbstractVertexToHive<I extends WritableComparable,
     V extends Writable, E extends Writable>
     extends DefaultConfigurableAndTableSchemaAware<I, V, E>
-    implements VertexToHive<I, V, E> { }
+    implements VertexToHive<I, V, E> {
+  @Override
+  public void initialize() { }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/c2af2048/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveVertexWriter.java
----------------------------------------------------------------------
diff --git 
a/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveVertexWriter.java 
b/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveVertexWriter.java
index bb27f25..796bc8f 100644
--- 
a/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveVertexWriter.java
+++ 
b/giraph-hive/src/main/java/org/apache/giraph/hive/output/HiveVertexWriter.java
@@ -97,6 +97,7 @@ public class HiveVertexWriter<I extends WritableComparable, V 
extends Writable,
   public void initialize(TaskAttemptContext context)
     throws IOException, InterruptedException {
     vertexToHive = HiveUtils.newVertexToHive(getConf(), tableSchema);
+    vertexToHive.initialize();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/giraph/blob/c2af2048/giraph-hive/src/main/java/org/apache/giraph/hive/output/TypedVertexToHive.java
----------------------------------------------------------------------
diff --git 
a/giraph-hive/src/main/java/org/apache/giraph/hive/output/TypedVertexToHive.java
 
b/giraph-hive/src/main/java/org/apache/giraph/hive/output/TypedVertexToHive.java
new file mode 100644
index 0000000..8d6fd9a
--- /dev/null
+++ 
b/giraph-hive/src/main/java/org/apache/giraph/hive/output/TypedVertexToHive.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.hive.output;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.conf.StrConfOption;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.hive.types.HiveValueWriter;
+import org.apache.giraph.hive.types.HiveVertexIdWriter;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+import com.facebook.hiveio.output.HiveOutputDescription;
+import com.facebook.hiveio.record.HiveWritableRecord;
+import com.facebook.hiveio.schema.HiveTableSchema;
+
+import static org.apache.giraph.hive.types.TypedValueWriter.createValueWriter;
+import static org.apache.giraph.hive.types.TypedVertexIdWriter.createIdWriter;
+
+/**
+ * A {@link VertexToHive} using
+ * {@link org.apache.giraph.types.WritableUnwrapper}s.
+ *
+ * @param <I> Vertex ID
+ * @param <V> Vertex Value
+ * @param <E> Edge Value
+ */
+public class TypedVertexToHive<I extends WritableComparable, V extends 
Writable,
+    E extends Writable> extends SimpleVertexToHive<I, V, E> {
+  /** Source ID column name in Hive */
+  public static final StrConfOption VERTEX_ID_COLUMN =
+      new StrConfOption("hive.output.vertex.id.column", null,
+          "Source Vertex ID column");
+  /** Target ID column name in Hive */
+  public static final StrConfOption VERTEX_VALUE_COLUMN =
+      new StrConfOption("hive.output.vertex.value.column", null,
+          "Target Vertex ID column");
+
+  /** Vertex ID writer */
+  private HiveVertexIdWriter<I> vertexIdWriter;
+  /** Vertex Value writer */
+  private HiveValueWriter<V> vertexValueWriter;
+
+  @Override
+  public void checkOutput(HiveOutputDescription outputDesc,
+      HiveTableSchema schema, HiveWritableRecord emptyRecord) { }
+
+  @Override
+  public void initialize() {
+    HiveTableSchema schema = getTableSchema();
+    ImmutableClassesGiraphConfiguration conf = getConf();
+
+    vertexIdWriter = createIdWriter(conf, VERTEX_ID_COLUMN, schema);
+    vertexValueWriter = createValueWriter(conf, VERTEX_VALUE_COLUMN, schema);
+  }
+
+  @Override
+  public void fillRecord(Vertex<I, V, E> vertex, HiveWritableRecord record) {
+    vertexIdWriter.writeId(vertex.getId(), record);
+    vertexValueWriter.writeValue(vertex.getValue(), record);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/c2af2048/giraph-hive/src/main/java/org/apache/giraph/hive/output/VertexToHive.java
----------------------------------------------------------------------
diff --git 
a/giraph-hive/src/main/java/org/apache/giraph/hive/output/VertexToHive.java 
b/giraph-hive/src/main/java/org/apache/giraph/hive/output/VertexToHive.java
index f9537a7..bf13ec1 100644
--- a/giraph-hive/src/main/java/org/apache/giraph/hive/output/VertexToHive.java
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/output/VertexToHive.java
@@ -38,6 +38,12 @@ import java.io.IOException;
 public interface VertexToHive<I extends WritableComparable, V extends Writable,
     E extends Writable> {
   /**
+   * User initialization before any saveVertex() calls but after Configuration
+   * and HiveTableSchema are guaranteed to be set.
+   */
+  void initialize();
+
+  /**
    * Check the output is valid. This method provides information to the user as
    * early as possible so that they may validate they are using the correct
    * input and fail the job early rather than getting into it and waiting a 
long

http://git-wip-us.apache.org/repos/asf/giraph/blob/c2af2048/giraph-hive/src/main/java/org/apache/giraph/hive/types/HiveValueReader.java
----------------------------------------------------------------------
diff --git 
a/giraph-hive/src/main/java/org/apache/giraph/hive/types/HiveValueReader.java 
b/giraph-hive/src/main/java/org/apache/giraph/hive/types/HiveValueReader.java
new file mode 100644
index 0000000..c0fad12
--- /dev/null
+++ 
b/giraph-hive/src/main/java/org/apache/giraph/hive/types/HiveValueReader.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.hive.types;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+
+import com.facebook.hiveio.record.HiveReadableRecord;
+
+/**
+ * Interface for reading Vertex / Edge values from Hive records
+ *
+ * @param <T> Value type
+ */
+public interface HiveValueReader<T extends Writable> {
+  /**
+   * Read value from record
+   *
+   * @param record Hive record
+   * @return value
+   */
+  T readValue(HiveReadableRecord record);
+
+  /**
+   * Null implementation that return NullWritable
+   *
+   * @param <W> Writable type
+   */
+  public class Null<W extends Writable> implements HiveValueReader<W> {
+    /** Singleton */
+    private static final Null INSTANCE = new Null();
+
+    /**
+     * Get singleton
+     *
+     * @return singleton instance
+     */
+    public static Null get() {
+      return INSTANCE;
+    }
+
+    @Override
+    public W readValue(HiveReadableRecord record) {
+      return (W) NullWritable.get();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/c2af2048/giraph-hive/src/main/java/org/apache/giraph/hive/types/HiveValueWriter.java
----------------------------------------------------------------------
diff --git 
a/giraph-hive/src/main/java/org/apache/giraph/hive/types/HiveValueWriter.java 
b/giraph-hive/src/main/java/org/apache/giraph/hive/types/HiveValueWriter.java
new file mode 100644
index 0000000..f38f79a
--- /dev/null
+++ 
b/giraph-hive/src/main/java/org/apache/giraph/hive/types/HiveValueWriter.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.hive.types;
+
+import org.apache.hadoop.io.Writable;
+
+import com.facebook.hiveio.record.HiveWritableRecord;
+
+/**
+ * Interface for writing Vertex / Edge values from Hive records
+ *
+ * @param <T> Value type
+ */
+public interface HiveValueWriter<T extends Writable> {
+  /**
+   * Write value to record
+   *
+   * @param value the value
+   * @param record Hive record
+   */
+  void writeValue(T value, HiveWritableRecord record);
+
+  /**
+   * Null implementation that does nothing
+   *
+   * @param <W> Writable type
+   */
+  public static class Null<W extends Writable> implements HiveValueWriter<W> {
+    /** Singleton */
+    private static final Null INSTANCE = new Null();
+
+    /**
+     * Get singleton
+     *
+     * @return singleton instance
+     */
+    public static Null get() {
+      return INSTANCE;
+    }
+
+    @Override
+    public void writeValue(W value, HiveWritableRecord record) { }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/c2af2048/giraph-hive/src/main/java/org/apache/giraph/hive/types/HiveVertexIdReader.java
----------------------------------------------------------------------
diff --git 
a/giraph-hive/src/main/java/org/apache/giraph/hive/types/HiveVertexIdReader.java
 
b/giraph-hive/src/main/java/org/apache/giraph/hive/types/HiveVertexIdReader.java
new file mode 100644
index 0000000..1ad6f7f
--- /dev/null
+++ 
b/giraph-hive/src/main/java/org/apache/giraph/hive/types/HiveVertexIdReader.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.hive.types;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.WritableComparable;
+
+import com.facebook.hiveio.record.HiveReadableRecord;
+
+/**
+ * Interface for reading Vertex IDs from Hive records
+ *
+ * @param <I> Vertex ID
+ */
+public interface HiveVertexIdReader<I extends WritableComparable> {
+  /**
+   * Read Vertex ID from Hive record
+   *
+   * @param record Hive record
+   * @return Vertex ID
+   */
+  I readId(HiveReadableRecord record);
+
+  /**
+   * Null implementation that return NullWritable
+   *
+   * @param <W> Writable type
+   */
+  public static class Null<W extends WritableComparable>
+      implements HiveVertexIdReader<W> {
+    /** Singleton */
+    private static final Null INSTANCE = new Null();
+
+    /**
+     * Get singleton
+     *
+     * @return singleton instance
+     */
+    public static Null get() {
+      return INSTANCE;
+    }
+
+    @Override
+    public W readId(HiveReadableRecord record) {
+      return (W) NullWritable.get();
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/giraph/blob/c2af2048/giraph-hive/src/main/java/org/apache/giraph/hive/types/HiveVertexIdWriter.java
----------------------------------------------------------------------
diff --git 
a/giraph-hive/src/main/java/org/apache/giraph/hive/types/HiveVertexIdWriter.java
 
b/giraph-hive/src/main/java/org/apache/giraph/hive/types/HiveVertexIdWriter.java
new file mode 100644
index 0000000..33196f5
--- /dev/null
+++ 
b/giraph-hive/src/main/java/org/apache/giraph/hive/types/HiveVertexIdWriter.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.hive.types;
+
+import org.apache.hadoop.io.WritableComparable;
+
+import com.facebook.hiveio.record.HiveWritableRecord;
+
+/**
+ * Interface for writing Vertex IDs from Hive records
+ *
+ * @param <I> Vertex ID
+ */
+public interface HiveVertexIdWriter<I extends WritableComparable> {
+  /**
+   * Write Vertex ID to record
+   *
+   * @param id Vertex ID
+   * @param record Hive record
+   */
+  void writeId(I id, HiveWritableRecord record);
+
+  /**
+   * Null implementation that does nothing
+   *
+   * @param <W> Writable type
+   */
+  public static class Null<W extends WritableComparable>
+      implements HiveVertexIdWriter<W> {
+    /** Singleton */
+    private static final Null INSTANCE = new Null();
+
+    /**
+     * Get singleton
+     *
+     * @return singleton instance
+     */
+    public static Null get() {
+      return INSTANCE;
+    }
+
+    @Override
+    public void writeId(W id, HiveWritableRecord record) { }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/c2af2048/giraph-hive/src/main/java/org/apache/giraph/hive/types/TypedValueReader.java
----------------------------------------------------------------------
diff --git 
a/giraph-hive/src/main/java/org/apache/giraph/hive/types/TypedValueReader.java 
b/giraph-hive/src/main/java/org/apache/giraph/hive/types/TypedValueReader.java
new file mode 100644
index 0000000..e9b0687
--- /dev/null
+++ 
b/giraph-hive/src/main/java/org/apache/giraph/hive/types/TypedValueReader.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.hive.types;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.conf.StrConfOption;
+import org.apache.giraph.types.WritableWrapper;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+
+import com.facebook.hiveio.record.HiveReadableRecord;
+import com.facebook.hiveio.schema.HiveTableSchema;
+
+import static org.apache.giraph.hive.common.HiveUtils.columnIndexOrThrow;
+import static org.apache.giraph.types.WritableWrappers.lookup;
+
+/**
+ * Reader for Vertex/Edge Values from Hive with known types
+ *
+ * @param <W> Vertex/Edge Value
+ */
+public class TypedValueReader<W extends Writable>
+    implements HiveValueReader<W> {
+  /** Hive column index */
+  private final int columnIndex;
+  /** {@link WritableWrapper} for Hive column to Giraph Writable */
+  private final WritableWrapper<W, Object> writableWrapper;
+
+  /**
+   * Constructor
+   *
+   * @param columnIndex column index
+   * @param writableWrapper {@link WritableWrapper}
+   */
+  public TypedValueReader(int columnIndex,
+      WritableWrapper<W, Object> writableWrapper) {
+    this.columnIndex = columnIndex;
+    this.writableWrapper = writableWrapper;
+  }
+
+  /**
+   * Create from Configuration with column name and Schema
+   *
+   * @param conf Configuration
+   * @param columnOption StrConfOption for column name
+   * @param schema HiveTableSchema
+   * @param <V> Vertex/Edge Value
+   * @return TypedVertexValueReader
+   */
+  public static <V extends Writable> HiveValueReader<V>
+  createValueReader(
+        ImmutableClassesGiraphConfiguration conf, StrConfOption columnOption,
+        HiveTableSchema schema) {
+    Class<V> vertexValueClass = conf.getVertexValueClass();
+    if (NullWritable.class.isAssignableFrom(vertexValueClass)) {
+      return HiveValueReader.Null.get();
+    }
+    int columnIndex = columnIndexOrThrow(schema, conf, columnOption);
+    Class hiveClass = schema.columnType(columnIndex).javaClass();
+    WritableWrapper wrapper = lookup(vertexValueClass, hiveClass);
+    return new TypedValueReader(columnIndex, wrapper);
+  }
+
+  @Override
+  public W readValue(HiveReadableRecord record) {
+    Object object = record.get(columnIndex);
+    return writableWrapper.wrap(object);
+  }
+
+  public int getColumnIndex() {
+    return columnIndex;
+  }
+
+  public WritableWrapper<W, Object> getWritableWrapper() {
+    return writableWrapper;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/c2af2048/giraph-hive/src/main/java/org/apache/giraph/hive/types/TypedValueWriter.java
----------------------------------------------------------------------
diff --git 
a/giraph-hive/src/main/java/org/apache/giraph/hive/types/TypedValueWriter.java 
b/giraph-hive/src/main/java/org/apache/giraph/hive/types/TypedValueWriter.java
new file mode 100644
index 0000000..8841bda
--- /dev/null
+++ 
b/giraph-hive/src/main/java/org/apache/giraph/hive/types/TypedValueWriter.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.hive.types;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.conf.StrConfOption;
+import org.apache.giraph.types.WritableUnwrapper;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+
+import com.facebook.hiveio.record.HiveWritableRecord;
+import com.facebook.hiveio.schema.HiveTableSchema;
+
+import static org.apache.giraph.hive.common.HiveUtils.columnIndexOrThrow;
+import static org.apache.giraph.types.WritableUnwrappers.lookup;
+
+/**
+ * Writer for Vertex/Edge Values from Hive with known types
+ *
+ * @param <W> Vertex/Edge Value
+ */
+public class TypedValueWriter<W extends Writable>
+    implements HiveValueWriter<W> {
+  /** Hive column index */
+  private final int columnIndex;
+  /** {@link WritableUnwrapper} for Hive column to Giraph Writable */
+  private final WritableUnwrapper<W, Object> writableUnwrapper;
+
+  /**
+   * Constructor
+   *
+   * @param columnIndex column index
+   * @param writableUnwrapper JavaWritableConverter
+   */
+  public TypedValueWriter(int columnIndex,
+      WritableUnwrapper<W, Object> writableUnwrapper) {
+    this.columnIndex = columnIndex;
+    this.writableUnwrapper = writableUnwrapper;
+  }
+
+  /**
+   * Create from Configuration with column name and Schema
+   *
+   * @param conf Configuration
+   * @param columnOption StrConfOption for column name
+   * @param schema HiveTableSchema
+   * @param <V> Vertex/Edge Value
+   * @return TypedVertexValueReader
+   */
+  public static <V extends Writable> HiveValueWriter<V>
+  createValueWriter(
+        ImmutableClassesGiraphConfiguration conf, StrConfOption columnOption,
+        HiveTableSchema schema) {
+    Class<V> vertexValueClass = conf.getVertexValueClass();
+    if (NullWritable.class.isAssignableFrom(vertexValueClass)) {
+      return HiveValueWriter.Null.get();
+    }
+    int columnIndex = columnIndexOrThrow(schema, conf, columnOption);
+    Class hiveClass = schema.columnType(columnIndex).javaClass();
+    WritableUnwrapper unwrapper = lookup(vertexValueClass, hiveClass);
+    return new TypedValueWriter(columnIndex, unwrapper);
+  }
+
+  @Override
+  public void writeValue(W value, HiveWritableRecord record) {
+    Object object = writableUnwrapper.unwrap(value);
+    record.set(columnIndex, object);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/c2af2048/giraph-hive/src/main/java/org/apache/giraph/hive/types/TypedVertexIdReader.java
----------------------------------------------------------------------
diff --git 
a/giraph-hive/src/main/java/org/apache/giraph/hive/types/TypedVertexIdReader.java
 
b/giraph-hive/src/main/java/org/apache/giraph/hive/types/TypedVertexIdReader.java
new file mode 100644
index 0000000..6d80d37
--- /dev/null
+++ 
b/giraph-hive/src/main/java/org/apache/giraph/hive/types/TypedVertexIdReader.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.hive.types;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.conf.StrConfOption;
+import org.apache.giraph.types.WritableWrapper;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.WritableComparable;
+
+import com.facebook.hiveio.record.HiveReadableRecord;
+import com.facebook.hiveio.schema.HiveTableSchema;
+
+import static org.apache.giraph.hive.common.HiveUtils.columnIndexOrThrow;
+import static org.apache.giraph.types.WritableWrappers.lookup;
+
+/**
+ * Reader for Vertex IDs from Hive with known types
+ *
+ * @param <I> Vertex ID
+ */
+public class TypedVertexIdReader<I extends WritableComparable>
+    implements HiveVertexIdReader<I> {
+  /** Hive column index */
+  private final int columnIndex;
+  /** {@link WritableWrapper} for Hive column to Giraph Writable */
+  private final WritableWrapper<I, Object> writableWrapper;
+
+  /**
+   * Constructor
+   *
+   * @param columnIndex column index
+   * @param writableWrapper {@link WritableWrapper}
+   */
+  public TypedVertexIdReader(int columnIndex,
+      WritableWrapper<I, Object> writableWrapper) {
+    this.columnIndex = columnIndex;
+    this.writableWrapper = writableWrapper;
+  }
+
+  /**
+   * Create from Configuration with column name and Schema
+   *
+   * @param conf {@link org.apache.hadoop.conf.Configuration}
+   * @param columnOption {@link StrConfOption} for column name
+   * @param schema {@link HiveTableSchema}
+   * @param <I> Vertex ID
+   * @return {@link TypedVertexIdReader}
+   */
+  public static <I extends WritableComparable> HiveVertexIdReader<I>
+  createIdReader(
+      ImmutableClassesGiraphConfiguration conf, StrConfOption columnOption,
+      HiveTableSchema schema) {
+    Class<I> vertexIdClass = conf.getVertexIdClass();
+    if (NullWritable.class.isAssignableFrom(vertexIdClass)) {
+      return HiveVertexIdReader.Null.get();
+    }
+    int columnIndex = columnIndexOrThrow(schema, conf, columnOption);
+    Class hiveClass = schema.columnType(columnIndex).javaClass();
+    WritableWrapper wrapper = lookup(vertexIdClass, hiveClass);
+    return new TypedVertexIdReader<I>(columnIndex, wrapper);
+  }
+
+  @Override
+  public I readId(HiveReadableRecord record) {
+    Object object = record.get(columnIndex);
+    return writableWrapper.wrap(object);
+  }
+
+  public int getColumnIndex() {
+    return columnIndex;
+  }
+
+  public WritableWrapper<I, Object> getWritableWrapper() {
+    return writableWrapper;
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/giraph/blob/c2af2048/giraph-hive/src/main/java/org/apache/giraph/hive/types/TypedVertexIdWriter.java
----------------------------------------------------------------------
diff --git 
a/giraph-hive/src/main/java/org/apache/giraph/hive/types/TypedVertexIdWriter.java
 
b/giraph-hive/src/main/java/org/apache/giraph/hive/types/TypedVertexIdWriter.java
new file mode 100644
index 0000000..aece54b
--- /dev/null
+++ 
b/giraph-hive/src/main/java/org/apache/giraph/hive/types/TypedVertexIdWriter.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.hive.types;
+
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.conf.StrConfOption;
+import org.apache.giraph.types.WritableUnwrapper;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.WritableComparable;
+
+import com.facebook.hiveio.record.HiveWritableRecord;
+import com.facebook.hiveio.schema.HiveTableSchema;
+
+import static org.apache.giraph.hive.common.HiveUtils.columnIndexOrThrow;
+import static org.apache.giraph.types.WritableUnwrappers.lookup;
+
+/**
+ * Writer for Vertex IDs from Hive with known types
+ *
+ * @param <I> Vertex ID
+ */
+public class TypedVertexIdWriter<I extends WritableComparable>
+    implements HiveVertexIdWriter<I> {
+  /** Hive column index */
+  private final int columnIndex;
+  /** {@link WritableUnwrapper} for Hive column to Giraph Writable */
+  private final WritableUnwrapper<I, Object> writableUnwrapper;
+
+  /**
+   * Constructor
+   *
+   * @param columnIndex column index
+   * @param writableUnwrapper {@link WritableUnwrapper}
+   */
+  public TypedVertexIdWriter(int columnIndex,
+      WritableUnwrapper<I, Object> writableUnwrapper) {
+    this.columnIndex = columnIndex;
+    this.writableUnwrapper = writableUnwrapper;
+  }
+
+  /**
+   * Create from Configuration with column name and Schema
+   *
+   * @param conf {@link org.apache.hadoop.conf.Configuration}
+   * @param columnOption {@link StrConfOption} for column name
+   * @param schema {@link HiveTableSchema}
+   * @param <I> Vertex ID
+   * @return {@link TypedVertexIdWriter}
+   */
+  public static <I extends WritableComparable> HiveVertexIdWriter<I>
+  createIdWriter(
+      ImmutableClassesGiraphConfiguration conf, StrConfOption columnOption,
+      HiveTableSchema schema) {
+    Class<I> vertexIdClass = conf.getVertexIdClass();
+    if (NullWritable.class.isAssignableFrom(vertexIdClass)) {
+      return HiveVertexIdWriter.Null.get();
+    }
+    int columnIndex = columnIndexOrThrow(schema, conf, columnOption);
+    Class hiveClass = schema.columnType(columnIndex).javaClass();
+    WritableUnwrapper unwrapper = lookup(vertexIdClass, hiveClass);
+    return new TypedVertexIdWriter<I>(columnIndex, unwrapper);
+  }
+
+  @Override
+  public void writeId(I value, HiveWritableRecord record) {
+    Object object = writableUnwrapper.unwrap(value);
+    record.set(columnIndex, object);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/c2af2048/giraph-hive/src/main/java/org/apache/giraph/hive/types/package-info.java
----------------------------------------------------------------------
diff --git 
a/giraph-hive/src/main/java/org/apache/giraph/hive/types/package-info.java 
b/giraph-hive/src/main/java/org/apache/giraph/hive/types/package-info.java
new file mode 100644
index 0000000..7004637
--- /dev/null
+++ b/giraph-hive/src/main/java/org/apache/giraph/hive/types/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * HiveIO type-based things.
+ */
+package org.apache.giraph.hive.types;

Reply via email to