http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/test/java/org/apache/giraph/io/TestEdgeInput.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/test/java/org/apache/giraph/io/TestEdgeInput.java 
b/giraph-core/src/test/java/org/apache/giraph/io/TestEdgeInput.java
deleted file mode 100644
index 45d946f..0000000
--- a/giraph-core/src/test/java/org/apache/giraph/io/TestEdgeInput.java
+++ /dev/null
@@ -1,261 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.giraph.io;
-
-import org.apache.giraph.BspCase;
-import org.apache.giraph.conf.GiraphConfiguration;
-import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
-import org.apache.giraph.edge.ByteArrayEdges;
-import org.apache.giraph.edge.Edge;
-import org.apache.giraph.graph.Vertex;
-import org.apache.giraph.factories.VertexValueFactory;
-import org.apache.giraph.io.formats.IdWithValueTextOutputFormat;
-import org.apache.giraph.io.formats.IntIntTextVertexValueInputFormat;
-import org.apache.giraph.io.formats.IntNullReverseTextEdgeInputFormat;
-import org.apache.giraph.io.formats.IntNullTextEdgeInputFormat;
-import org.apache.giraph.utils.ComputationCountEdges;
-import org.apache.giraph.utils.IntIntNullNoOpComputation;
-import org.apache.giraph.utils.InternalVertexRunner;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.junit.Test;
-
-import com.google.common.collect.Maps;
-
-import java.io.IOException;
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-/**
- * A test case to ensure that loading a graph from a list of edges works as
- * expected.
- */
-public class TestEdgeInput extends BspCase {
-  public TestEdgeInput() {
-    super(TestEdgeInput.class.getName());
-  }
-
-  // It should be able to build a graph starting from the edges only.
-  // Vertices should be implicitly created with default values.
-  @Test
-  public void testEdgesOnly() throws Exception {
-    String[] edges = new String[] {
-        "1 2",
-        "2 3",
-        "2 4",
-        "4 1"
-    };
-
-    GiraphConfiguration conf = new GiraphConfiguration();
-    conf.setComputationClass(ComputationCountEdges.class);
-    conf.setOutEdgesClass(ByteArrayEdges.class);
-    conf.setEdgeInputFormatClass(IntNullTextEdgeInputFormat.class);
-    conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
-    Iterable<String> results = InternalVertexRunner.run(conf, null, edges);
-
-    Map<Integer, Integer> values = parseResults(results);
-
-    // Check that all vertices with outgoing edges have been created
-    assertEquals(3, values.size());
-    // Check the number of edges for each vertex
-    assertEquals(1, (int) values.get(1));
-    assertEquals(2, (int) values.get(2));
-    assertEquals(1, (int) values.get(4));
-  }
-
-  // It should be able to build a graph starting from the edges only.
-  // Using ReverseEdgeDuplicator it should also create the reverse edges.
-  // Vertices should be implicitly created with default values.
-  @Test
-  public void testEdgesOnlyWithReverse() throws Exception {
-    String[] edges = new String[] {
-        "1 2",
-        "2 3",
-        "2 4",
-        "4 1"
-    };
-
-    GiraphConfiguration conf = new GiraphConfiguration();
-    conf.setComputationClass(ComputationCountEdges.class);
-    conf.setOutEdgesClass(ByteArrayEdges.class);
-    conf.setEdgeInputFormatClass(IntNullReverseTextEdgeInputFormat.class);
-    conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
-    Iterable<String> results = InternalVertexRunner.run(conf, null, edges);
-
-    Map<Integer, Integer> values = parseResults(results);
-
-    // Check that all vertices with outgoing edges have been created
-    assertEquals(4, values.size());
-    // Check the number of edges for each vertex
-    assertEquals(2, (int) values.get(1));
-    assertEquals(3, (int) values.get(2));
-    assertEquals(1, (int) values.get(3));
-    assertEquals(2, (int) values.get(4));
-  }
-
-  // It should be able to build a graph by specifying vertex data and edges
-  // as separate input formats.
-  @Test
-  public void testMixedFormat() throws Exception {
-    String[] vertices = new String[] {
-        "1 75",
-        "2 34",
-        "3 13",
-        "4 32"
-    };
-    String[] edges = new String[] {
-        "1 2",
-        "2 3",
-        "2 4",
-        "4 1",
-        "5 3"
-    };
-
-    GiraphConfiguration conf = new GiraphConfiguration();
-    conf.setComputationClass(IntIntNullNoOpComputation.class);
-    conf.setOutEdgesClass(ByteArrayEdges.class);
-    conf.setVertexInputFormatClass(IntIntTextVertexValueInputFormat.class);
-    conf.setEdgeInputFormatClass(IntNullTextEdgeInputFormat.class);
-    conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
-
-    // Run a job with a vertex that does nothing
-    Iterable<String> results = InternalVertexRunner.run(conf, vertices, edges);
-
-    Map<Integer, Integer> values = parseResults(results);
-
-    // Check that all vertices with either initial values or outgoing edges
-    // have been created
-    assertEquals(5, values.size());
-    // Check that the vertices have been created with correct values
-    assertEquals(75, (int) values.get(1));
-    assertEquals(34, (int) values.get(2));
-    assertEquals(13, (int) values.get(3));
-    assertEquals(32, (int) values.get(4));
-    // A vertex with edges but no initial value should have the default value
-    assertEquals(0, (int) values.get(5));
-
-    // Run a job with a custom VertexValueFactory
-    conf.setVertexValueFactoryClass(TestVertexValueFactory.class);
-    results = InternalVertexRunner.run(conf, vertices, edges);
-    values = parseResults(results);
-    // A vertex with edges but no initial value should have been constructed
-    // by the custom factory
-    assertEquals(3, (int) values.get(5));
-
-    conf = new GiraphConfiguration();
-    conf.setComputationClass(ComputationCountEdges.class);
-    conf.setOutEdgesClass(ByteArrayEdges.class);
-    conf.setVertexInputFormatClass(IntIntTextVertexValueInputFormat.class);
-    conf.setEdgeInputFormatClass(IntNullTextEdgeInputFormat.class);
-    conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
-
-    // Run a job with a vertex that counts outgoing edges
-    results = InternalVertexRunner.run(conf, vertices, edges);
-
-    values = parseResults(results);
-
-    // Check the number of edges for each vertex
-    assertEquals(1, (int) values.get(1));
-    assertEquals(2, (int) values.get(2));
-    assertEquals(0, (int) values.get(3));
-    assertEquals(1, (int) values.get(4));
-    assertEquals(1, (int) values.get(5));
-  }
-
-  // It should use the specified input OutEdges class.
-  @Test
-  public void testDifferentInputEdgesClass() throws Exception {
-    String[] edges = new String[] {
-        "1 2",
-        "2 3",
-        "2 4",
-        "4 1"
-    };
-
-    GiraphConfiguration conf = new GiraphConfiguration();
-    conf.setComputationClass(TestComputationCheckEdgesType.class);
-    conf.setOutEdgesClass(ByteArrayEdges.class);
-    conf.setInputOutEdgesClass(TestOutEdgesFilterEven.class);
-    conf.setEdgeInputFormatClass(IntNullTextEdgeInputFormat.class);
-    conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
-    Iterable<String> results = InternalVertexRunner.run(conf, null, edges);
-
-    Map<Integer, Integer> values = parseResults(results);
-
-    // Check that all vertices with outgoing edges in the input have been
-    // created
-    assertEquals(3, values.size());
-    // Check the number of edges for each vertex (edges with odd target id
-    // should have been removed)
-    assertEquals(1, (int) values.get(1));
-    assertEquals(1, (int) values.get(2));
-    assertEquals(0, (int) values.get(4));
-  }
-
-  public static class TestComputationCheckEdgesType extends
-      ComputationCountEdges {
-    @Override
-    public void compute(Vertex<IntWritable, IntWritable, NullWritable> vertex,
-        Iterable<NullWritable> messages) throws IOException {
-      assertFalse(vertex.getEdges() instanceof TestOutEdgesFilterEven);
-      assertTrue(vertex.getEdges() instanceof ByteArrayEdges);
-      super.compute(vertex, messages);
-    }
-  }
-
-  public static class TestVertexValueFactory
-      implements VertexValueFactory<IntWritable> {
-    @Override
-    public void initialize(ImmutableClassesGiraphConfiguration conf) { }
-
-    @Override
-    public Class<IntWritable> getValueClass() {
-      return IntWritable.class;
-    }
-
-    @Override
-    public IntWritable newInstance() {
-      return new IntWritable(3);
-    }
-  }
-
-  public static class TestOutEdgesFilterEven
-      extends ByteArrayEdges<IntWritable, NullWritable> {
-    @Override
-    public void add(Edge<IntWritable, NullWritable> edge) {
-      if (edge.getTargetVertexId().get() % 2 == 0) {
-        super.add(edge);
-      }
-    }
-  }
-
-  private static Map<Integer, Integer> parseResults(Iterable<String> results) {
-    Map<Integer, Integer> values = Maps.newHashMap();
-    for (String line : results) {
-      String[] tokens = line.split("\\s+");
-      int id = Integer.valueOf(tokens[0]);
-      int value = Integer.valueOf(tokens[1]);
-      values.put(id, value);
-    }
-    return values;
-  }
-}

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/test/java/org/apache/giraph/io/TestVertexEdgeInput.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/test/java/org/apache/giraph/io/TestVertexEdgeInput.java 
b/giraph-core/src/test/java/org/apache/giraph/io/TestVertexEdgeInput.java
new file mode 100644
index 0000000..721a74c
--- /dev/null
+++ b/giraph-core/src/test/java/org/apache/giraph/io/TestVertexEdgeInput.java
@@ -0,0 +1,385 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.io;
+
+import com.google.common.collect.Maps;
+import org.apache.giraph.BspCase;
+import org.apache.giraph.conf.GiraphConfiguration;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.edge.ByteArrayEdges;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.factories.VertexValueFactory;
+import org.apache.giraph.graph.Vertex;
+import org.apache.giraph.graph.VertexValueCombiner;
+import org.apache.giraph.io.formats.IdWithValueTextOutputFormat;
+import org.apache.giraph.io.formats.IntIntNullTextVertexInputFormat;
+import org.apache.giraph.io.formats.IntIntTextVertexValueInputFormat;
+import org.apache.giraph.io.formats.IntNullReverseTextEdgeInputFormat;
+import org.apache.giraph.io.formats.IntNullTextEdgeInputFormat;
+import org.apache.giraph.utils.ComputationCountEdges;
+import org.apache.giraph.utils.IntIntNullNoOpComputation;
+import org.apache.giraph.utils.InternalVertexRunner;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Map;
+
+import static org.junit.Assert.*;
+
+/**
+ * A test case to ensure that loading a graph from vertices and edges works as
+ * expected.
+ */
+public class TestVertexEdgeInput extends BspCase {
+  public TestVertexEdgeInput() {
+    super(TestVertexEdgeInput.class.getName());
+  }
+
+  // It should be able to build a graph starting from the edges only.
+  // Vertices should be implicitly created with default values.
+  @Test
+  public void testEdgesOnly() throws Exception {
+    String[] edges = new String[] {
+        "1 2",
+        "2 3",
+        "2 4",
+        "4 1"
+    };
+
+    GiraphConfiguration conf = new GiraphConfiguration();
+    conf.setComputationClass(ComputationCountEdges.class);
+    conf.setOutEdgesClass(ByteArrayEdges.class);
+    conf.setEdgeInputFormatClass(IntNullTextEdgeInputFormat.class);
+    conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
+    Iterable<String> results = InternalVertexRunner.run(conf, null, edges);
+
+    Map<Integer, Integer> values = parseResults(results);
+
+    // Check that all vertices with outgoing edges have been created
+    assertEquals(3, values.size());
+    // Check the number of edges for each vertex
+    assertEquals(1, (int) values.get(1));
+    assertEquals(2, (int) values.get(2));
+    assertEquals(1, (int) values.get(4));
+  }
+
+  // It should be able to build a graph starting from the edges only.
+  // Using ReverseEdgeDuplicator it should also create the reverse edges.
+  // Vertices should be implicitly created with default values.
+  @Test
+  public void testEdgesOnlyWithReverse() throws Exception {
+    String[] edges = new String[] {
+        "1 2",
+        "2 3",
+        "2 4",
+        "4 1"
+    };
+
+    GiraphConfiguration conf = new GiraphConfiguration();
+    conf.setComputationClass(ComputationCountEdges.class);
+    conf.setOutEdgesClass(ByteArrayEdges.class);
+    conf.setEdgeInputFormatClass(IntNullReverseTextEdgeInputFormat.class);
+    conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
+    Iterable<String> results = InternalVertexRunner.run(conf, null, edges);
+
+    Map<Integer, Integer> values = parseResults(results);
+
+    // Check that all vertices with outgoing edges have been created
+    assertEquals(4, values.size());
+    // Check the number of edges for each vertex
+    assertEquals(2, (int) values.get(1));
+    assertEquals(3, (int) values.get(2));
+    assertEquals(1, (int) values.get(3));
+    assertEquals(2, (int) values.get(4));
+  }
+
+  /**
+   * Simple vertex value combiner that sums up the vertex values.
+   */
+  public static class IntSumVertexValueCombiner implements 
VertexValueCombiner<IntWritable> {
+    @Override
+    public void combine(IntWritable originalVertexValue, IntWritable 
vertexValue) {
+      originalVertexValue.set(originalVertexValue.get() + vertexValue.get());
+    }
+  }
+
+  // It should be able to build a graph by specifying vertex value data
+  // and combining the duplicates (summation).  Edges should be added as well.
+  @Test
+  public void testVertexValueCombiner() throws Exception {
+    String[] vertices = new String[] {
+        "1 75 2",
+        "2 34 3",
+        "3 13",
+        "4 32",
+        "1 11",
+        "2 23 1",
+        "2 3"
+    };
+
+    GiraphConfiguration conf = new GiraphConfiguration();
+    conf.setComputationClass(IntIntNullNoOpComputation.class);
+    conf.setOutEdgesClass(ByteArrayEdges.class);
+    conf.setVertexInputFormatClass(IntIntNullTextVertexInputFormat.class);
+    conf.setVertexValueCombinerClass(IntSumVertexValueCombiner.class);
+    conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
+
+    // Run a job with a vertex that does nothing
+    Iterable<String> results = InternalVertexRunner.run(conf, vertices);
+
+    Map<Integer, Integer> values = parseResults(results);
+
+    // Check that all vertices were created
+    assertEquals(4, values.size());
+    // Check that the vertices have been created with correct values
+    assertEquals(86, (int) values.get(1));
+    assertEquals(60, (int) values.get(2));
+    assertEquals(13, (int) values.get(3));
+    assertEquals(32, (int) values.get(4));
+
+    // Run a job with a vertex that counts outgoing edges
+    conf.setComputationClass(ComputationCountEdges.class);
+    results = InternalVertexRunner.run(conf, vertices);
+
+    // Check that the edges were added as well
+    values = parseResults(results);
+    assertEquals(1, (int) values.get(1));
+    assertEquals(2, (int) values.get(2));
+    assertEquals(0, (int) values.get(3));
+    assertEquals(0, (int) values.get(4));
+  }
+
+  // It should be able to build a graph by specifying vertex value data
+  // and edges as separate input formats.
+  @Test
+  public void testMixedVertexValueEdgeFormat() throws Exception {
+    String[] vertices = new String[] {
+        "1 75",
+        "2 34",
+        "3 13",
+        "4 32"
+    };
+    String[] edges = new String[] {
+        "1 2",
+        "2 3",
+        "2 4",
+        "4 1",
+        "5 3"
+    };
+
+    GiraphConfiguration conf = new GiraphConfiguration();
+    conf.setComputationClass(IntIntNullNoOpComputation.class);
+    conf.setOutEdgesClass(ByteArrayEdges.class);
+    conf.setVertexInputFormatClass(IntIntTextVertexValueInputFormat.class);
+    conf.setEdgeInputFormatClass(IntNullTextEdgeInputFormat.class);
+    conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
+
+    // Run a job with a vertex that does nothing
+    Iterable<String> results = InternalVertexRunner.run(conf, vertices, edges);
+
+    Map<Integer, Integer> values = parseResults(results);
+
+    // Check that all vertices with either initial values or outgoing edges
+    // have been created
+    assertEquals(5, values.size());
+    // Check that the vertices have been created with correct values
+    assertEquals(75, (int) values.get(1));
+    assertEquals(34, (int) values.get(2));
+    assertEquals(13, (int) values.get(3));
+    assertEquals(32, (int) values.get(4));
+    // A vertex with edges but no initial value should have the default value
+    assertEquals(0, (int) values.get(5));
+
+    // Run a job with a custom VertexValueFactory
+    conf.setVertexValueFactoryClass(TestVertexValueFactory.class);
+    results = InternalVertexRunner.run(conf, vertices, edges);
+    values = parseResults(results);
+    // A vertex with edges but no initial value should have been constructed
+    // by the custom factory
+    assertEquals(3, (int) values.get(5));
+
+    conf = new GiraphConfiguration();
+    conf.setComputationClass(ComputationCountEdges.class);
+    conf.setOutEdgesClass(ByteArrayEdges.class);
+    conf.setVertexInputFormatClass(IntIntTextVertexValueInputFormat.class);
+    conf.setEdgeInputFormatClass(IntNullTextEdgeInputFormat.class);
+    conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
+
+    // Run a job with a vertex that counts outgoing edges
+    results = InternalVertexRunner.run(conf, vertices, edges);
+
+    values = parseResults(results);
+
+    // Check the number of edges for each vertex
+    assertEquals(1, (int) values.get(1));
+    assertEquals(2, (int) values.get(2));
+    assertEquals(0, (int) values.get(3));
+    assertEquals(1, (int) values.get(4));
+    assertEquals(1, (int) values.get(5));
+  }
+
+  // It should be able to build a graph by specifying vertices and edges
+  // as separate input formats.
+  @Test
+  public void testMixedVertexEdgeFormat() throws Exception {
+    String[] vertices = new String[] {
+        "1 75 2 3",
+        "2 34 1 5",
+        "3 13",
+        "4 32"
+    };
+    String[] edges = new String[] {
+        "1 2",
+        "2 3",
+        "2 4",
+        "4 1",
+        "5 3"
+    };
+
+    GiraphConfiguration conf = new GiraphConfiguration();
+    conf.setComputationClass(IntIntNullNoOpComputation.class);
+    conf.setOutEdgesClass(ByteArrayEdges.class);
+    conf.setVertexInputFormatClass(IntIntNullTextVertexInputFormat.class);
+    conf.setEdgeInputFormatClass(IntNullTextEdgeInputFormat.class);
+    conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
+
+    // Run a job with a vertex that does nothing
+    Iterable<String> results = InternalVertexRunner.run(conf, vertices, edges);
+
+    Map<Integer, Integer> values = parseResults(results);
+
+    // Check that all vertices with either initial values or outgoing edges
+    // have been created
+    assertEquals(5, values.size());
+    // Check that the vertices have been created with correct values
+    assertEquals(75, (int) values.get(1));
+    assertEquals(34, (int) values.get(2));
+    assertEquals(13, (int) values.get(3));
+    assertEquals(32, (int) values.get(4));
+    // A vertex with edges but no initial value should have the default value
+    assertEquals(0, (int) values.get(5));
+
+    // Run a job with a custom VertexValueFactory
+    conf.setVertexValueFactoryClass(TestVertexValueFactory.class);
+    results = InternalVertexRunner.run(conf, vertices, edges);
+    values = parseResults(results);
+    // A vertex with edges but no initial value should have been constructed
+    // by the custom factory
+    assertEquals(3, (int) values.get(5));
+
+    conf = new GiraphConfiguration();
+    conf.setComputationClass(ComputationCountEdges.class);
+    conf.setOutEdgesClass(ByteArrayEdges.class);
+    conf.setVertexInputFormatClass(IntIntNullTextVertexInputFormat.class);
+    conf.setEdgeInputFormatClass(IntNullTextEdgeInputFormat.class);
+    conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
+
+    // Run a job with a vertex that counts outgoing edges
+    results = InternalVertexRunner.run(conf, vertices, edges);
+
+    values = parseResults(results);
+
+    // Check the number of edges for each vertex
+    assertEquals(3, (int) values.get(1));
+    assertEquals(4, (int) values.get(2));
+    assertEquals(0, (int) values.get(3));
+    assertEquals(1, (int) values.get(4));
+    assertEquals(1, (int) values.get(5));
+  }
+
+  // It should use the specified input OutEdges class.
+  @Test
+  public void testDifferentInputEdgesClass() throws Exception {
+    String[] edges = new String[] {
+        "1 2",
+        "2 3",
+        "2 4",
+        "4 1"
+    };
+
+    GiraphConfiguration conf = new GiraphConfiguration();
+    conf.setComputationClass(TestComputationCheckEdgesType.class);
+    conf.setOutEdgesClass(ByteArrayEdges.class);
+    conf.setInputOutEdgesClass(TestOutEdgesFilterEven.class);
+    conf.setEdgeInputFormatClass(IntNullTextEdgeInputFormat.class);
+    conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
+    Iterable<String> results = InternalVertexRunner.run(conf, null, edges);
+
+    Map<Integer, Integer> values = parseResults(results);
+
+    // Check that all vertices with outgoing edges in the input have been
+    // created
+    assertEquals(3, values.size());
+    // Check the number of edges for each vertex (edges with odd target id
+    // should have been removed)
+    assertEquals(1, (int) values.get(1));
+    assertEquals(1, (int) values.get(2));
+    assertEquals(0, (int) values.get(4));
+  }
+
+  public static class TestComputationCheckEdgesType extends
+      ComputationCountEdges {
+    @Override
+    public void compute(Vertex<IntWritable, IntWritable, NullWritable> vertex,
+        Iterable<NullWritable> messages) throws IOException {
+      assertFalse(vertex.getEdges() instanceof TestOutEdgesFilterEven);
+      assertTrue(vertex.getEdges() instanceof ByteArrayEdges);
+      super.compute(vertex, messages);
+    }
+  }
+
+  public static class TestVertexValueFactory
+      implements VertexValueFactory<IntWritable> {
+    @Override
+    public void initialize(ImmutableClassesGiraphConfiguration conf) { }
+
+    @Override
+    public Class<IntWritable> getValueClass() {
+      return IntWritable.class;
+    }
+
+    @Override
+    public IntWritable newInstance() {
+      return new IntWritable(3);
+    }
+  }
+
+  public static class TestOutEdgesFilterEven
+      extends ByteArrayEdges<IntWritable, NullWritable> {
+    @Override
+    public void add(Edge<IntWritable, NullWritable> edge) {
+      if (edge.getTargetVertexId().get() % 2 == 0) {
+        super.add(edge);
+      }
+    }
+  }
+
+  private static Map<Integer, Integer> parseResults(Iterable<String> results) {
+    Map<Integer, Integer> values = Maps.newHashMap();
+    for (String line : results) {
+      String[] tokens = line.split("\\s+");
+      int id = Integer.valueOf(tokens[0]);
+      int value = Integer.valueOf(tokens[1]);
+      values.put(id, value);
+    }
+    return values;
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/test/java/org/apache/giraph/master/TestComputationCombinerTypes.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/test/java/org/apache/giraph/master/TestComputationCombinerTypes.java
 
b/giraph-core/src/test/java/org/apache/giraph/master/TestComputationCombinerTypes.java
index b62775f..4a8caaa 100644
--- 
a/giraph-core/src/test/java/org/apache/giraph/master/TestComputationCombinerTypes.java
+++ 
b/giraph-core/src/test/java/org/apache/giraph/master/TestComputationCombinerTypes.java
@@ -18,7 +18,7 @@
 
 package org.apache.giraph.master;
 
-import org.apache.giraph.combiner.Combiner;
+import org.apache.giraph.combiner.MessageCombiner;
 import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.graph.AbstractComputation;
@@ -47,7 +47,7 @@ public class TestComputationCombinerTypes {
   public void testAllMatchWithCombiner() {
     SuperstepClasses classes =
         new SuperstepClasses(IntIntIntLongDoubleComputation.class,
-            IntDoubleCombiner.class);
+            IntDoubleMessageCombiner.class);
     classes.verifyTypesMatch(
         createConfiguration(IntIntIntIntLongComputation.class), true);
   }
@@ -88,7 +88,7 @@ public class TestComputationCombinerTypes {
   public void testDifferentCombinerIdType() {
     SuperstepClasses classes =
         new SuperstepClasses(IntIntIntLongDoubleComputation.class,
-            DoubleDoubleCombiner.class);
+            DoubleDoubleMessageCombiner.class);
     classes.verifyTypesMatch(
         createConfiguration(IntIntIntIntLongComputation.class), true);
   }
@@ -97,7 +97,7 @@ public class TestComputationCombinerTypes {
   public void testDifferentCombinerMessageType() {
     SuperstepClasses classes =
         new SuperstepClasses(IntIntIntLongDoubleComputation.class,
-            IntLongCombiner.class);
+            IntLongMessageCombiner.class);
     classes.verifyTypesMatch(
         createConfiguration(IntIntIntIntLongComputation.class), true);
   }
@@ -138,8 +138,8 @@ public class TestComputationCombinerTypes {
       NoOpComputation<IntWritable, IntWritable, LongWritable, LongWritable,
           IntWritable> { }
 
-  private static class NoOpCombiner<I extends WritableComparable,
-      M extends Writable> extends Combiner<I, M> {
+  private static class NoOpMessageCombiner<I extends WritableComparable,
+      M extends Writable> extends MessageCombiner<I, M> {
     @Override
     public void combine(I vertexIndex, M originalMessage, M messageToCombine) {
     }
@@ -150,12 +150,15 @@ public class TestComputationCombinerTypes {
     }
   }
 
-  private static class IntDoubleCombiner extends NoOpCombiner<IntWritable,
-      DoubleWritable> { }
+  private static class IntDoubleMessageCombiner
+      extends NoOpMessageCombiner<IntWritable,
+                  DoubleWritable> { }
 
-  private static class DoubleDoubleCombiner extends 
NoOpCombiner<DoubleWritable,
-      DoubleWritable> { }
+  private static class DoubleDoubleMessageCombiner
+      extends NoOpMessageCombiner<DoubleWritable,
+                  DoubleWritable> { }
 
-  private static class IntLongCombiner extends NoOpCombiner<IntWritable,
-      LongWritable> { }
+  private static class IntLongMessageCombiner
+      extends NoOpMessageCombiner<IntWritable,
+                  LongWritable> { }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/test/java/org/apache/giraph/master/TestSwitchClasses.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/test/java/org/apache/giraph/master/TestSwitchClasses.java 
b/giraph-core/src/test/java/org/apache/giraph/master/TestSwitchClasses.java
index 6b0ed35..e96fd12 100644
--- a/giraph-core/src/test/java/org/apache/giraph/master/TestSwitchClasses.java
+++ b/giraph-core/src/test/java/org/apache/giraph/master/TestSwitchClasses.java
@@ -18,7 +18,7 @@
 
 package org.apache.giraph.master;
 
-import org.apache.giraph.combiner.Combiner;
+import org.apache.giraph.combiner.MessageCombiner;
 import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.graph.AbstractComputation;
 import org.apache.giraph.graph.Vertex;
@@ -40,7 +40,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashSet;
 
-/** Test switching Computation and Combiner class during application */
+/** Test switching Computation and MessageCombiner class during application */
 public class TestSwitchClasses {
   @Test
   public void testSwitchingClasses() throws Exception {
@@ -57,32 +57,41 @@ public class TestSwitchClasses {
     graph = InternalVertexRunner.run(conf, graph);
 
     Assert.assertEquals(2, graph.getVertices().size());
-    StatusValue value1 = graph.getVertex(id1).getValue();
-    StatusValue value2 = graph.getVertex(id2).getValue();
+  }
 
+  private static void checkVerticesOnFinalSuperstep(
+      Vertex<IntWritable, StatusValue, IntWritable> vertex) {
     // Check that computations were performed in expected order
-    ArrayList<Integer> expectedComputations = Lists.newArrayList(1, 1, 2, 3, 
1);
-    checkComputations(expectedComputations, value1.computations);
-    checkComputations(expectedComputations, value2.computations);
-
+    final ArrayList<Integer> expectedComputations =
+        Lists.newArrayList(1, 1, 2, 3, 1);
+    checkComputations(expectedComputations, vertex.getValue().computations);
     // Check that messages were sent in the correct superstep,
     // and combined when needed
-    ArrayList<HashSet<Double>> messages1 =
-        Lists.newArrayList(
-            Sets.<Double>newHashSet(),
-            Sets.<Double>newHashSet(11d),
-            Sets.<Double>newHashSet(11d),
-            Sets.<Double>newHashSet(101.5, 201.5),
-            Sets.<Double>newHashSet(3002d));
-    checkMessages(messages1, value1.messagesReceived);
-    ArrayList<HashSet<Double>> messages2 =
-        Lists.newArrayList(
-            Sets.<Double>newHashSet(),
-            Sets.<Double>newHashSet(12d),
-            Sets.<Double>newHashSet(12d),
-            Sets.<Double>newHashSet(102.5, 202.5),
-            Sets.<Double>newHashSet(3004d));
-    checkMessages(messages2, value2.messagesReceived);
+    switch (vertex.getId().get()) {
+      case 1:
+        ArrayList<HashSet<Double>> messages1 =
+            Lists.newArrayList(
+                Sets.<Double>newHashSet(),
+                Sets.<Double>newHashSet(11d),
+                Sets.<Double>newHashSet(11d),
+                Sets.<Double>newHashSet(101.5, 201.5),
+                Sets.<Double>newHashSet(3002d));
+        checkMessages(messages1, vertex.getValue().messagesReceived);
+        break;
+      case 2:
+        ArrayList<HashSet<Double>> messages2 =
+            Lists.newArrayList(
+                Sets.<Double>newHashSet(),
+                Sets.<Double>newHashSet(12d),
+                Sets.<Double>newHashSet(12d),
+                Sets.<Double>newHashSet(102.5, 202.5),
+                Sets.<Double>newHashSet(3004d));
+        checkMessages(messages2, vertex.getValue().messagesReceived);
+        break;
+      default:
+        throw new IllegalStateException("checkVertices: Illegal vertex " +
+            vertex);
+    }
   }
 
   private static void checkComputations(ArrayList<Integer> expected,
@@ -113,7 +122,7 @@ public class TestSwitchClasses {
       switch ((int) getSuperstep()) {
         case 0:
           setComputation(Computation1.class);
-          setCombiner(MinimumCombiner.class);
+          setMessageCombiner(MinimumMessageCombiner.class);
           break;
         case 1:
           // test classes don't change
@@ -121,11 +130,11 @@ public class TestSwitchClasses {
         case 2:
           setComputation(Computation2.class);
           // test combiner removed
-          setCombiner(null);
+          setMessageCombiner(null);
           break;
         case 3:
           setComputation(Computation3.class);
-          setCombiner(SumCombiner.class);
+          setMessageCombiner(SumMessageCombiner.class);
           break;
         case 4:
           setComputation(Computation1.class);
@@ -147,6 +156,10 @@ public class TestSwitchClasses {
       IntWritable otherId = new IntWritable(3 - vertex.getId().get());
       sendMessage(otherId, new IntWritable(otherId.get() + 10));
       sendMessage(otherId, new IntWritable(otherId.get() + 20));
+      // Check the vertices on the final superstep
+      if (getSuperstep() == 4) {
+        checkVerticesOnFinalSuperstep(vertex);
+      }
     }
   }
 
@@ -179,8 +192,9 @@ public class TestSwitchClasses {
     }
   }
 
-  public static class MinimumCombiner extends Combiner<IntWritable,
-      IntWritable> {
+  public static class MinimumMessageCombiner
+      extends MessageCombiner<IntWritable,
+                  IntWritable> {
     @Override
     public void combine(IntWritable vertexIndex, IntWritable originalMessage,
         IntWritable messageToCombine) {
@@ -194,7 +208,8 @@ public class TestSwitchClasses {
     }
   }
 
-  public static class SumCombiner extends Combiner<IntWritable, IntWritable> {
+  public static class SumMessageCombiner
+      extends MessageCombiner<IntWritable, IntWritable> {
     @Override
     public void combine(IntWritable vertexIndex, IntWritable originalMessage,
         IntWritable messageToCombine) {
@@ -232,6 +247,12 @@ public class TestSwitchClasses {
     }
 
     @Override
+    public String toString() {
+      return "(computations=" + computations +
+          ",messagesReceived=" + messagesReceived + ")";
+    }
+
+    @Override
     public void write(DataOutput dataOutput) throws IOException {
       dataOutput.writeInt(computations.size());
       for (Integer computation : computations) {

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java
 
b/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java
index 0e68b56..0dd9b9c 100644
--- 
a/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java
+++ 
b/giraph-core/src/test/java/org/apache/giraph/partition/TestPartitionStores.java
@@ -199,15 +199,15 @@ public class TestPartitionStores {
     partitionStore.addPartition(createPartition(conf, 4, v7));
 
     Partition<IntWritable, IntWritable, NullWritable> partition1 =
-        partitionStore.getPartition(1);
+        partitionStore.getOrCreatePartition(1);
     partitionStore.putPartition(partition1);
     Partition<IntWritable, IntWritable, NullWritable> partition2 =
-        partitionStore.getPartition(2);
+        partitionStore.getOrCreatePartition(2);
     partitionStore.putPartition(partition2);
     Partition<IntWritable, IntWritable, NullWritable> partition3 =
         partitionStore.removePartition(3);
     Partition<IntWritable, IntWritable, NullWritable> partition4 =
-        partitionStore.getPartition(4);
+        partitionStore.getOrCreatePartition(4);
     partitionStore.putPartition(partition4);
 
     assertEquals(3, partitionStore.getNumPartitions());
@@ -215,7 +215,7 @@ public class TestPartitionStores {
     int partitionsNumber = 0;
     for (Integer partitionId : partitionStore.getPartitionIds()) {
       Partition<IntWritable, IntWritable, NullWritable> p =
-          partitionStore.getPartition(partitionId);
+          partitionStore.getOrCreatePartition(partitionId);
       partitionStore.putPartition(p);
       partitionsNumber++;
     }
@@ -225,13 +225,13 @@ public class TestPartitionStores {
     assertTrue(partitionStore.hasPartition(2));
     assertFalse(partitionStore.hasPartition(3));
     assertTrue(partitionStore.hasPartition(4));
-    partition = partitionStore.getPartition(1);
+    partition = partitionStore.getOrCreatePartition(1);
     assertEquals(3, partition.getVertexCount());
     partitionStore.putPartition(partition);
-    partition = partitionStore.getPartition(2);
+    partition = partitionStore.getOrCreatePartition(2);
     assertEquals(2, partition.getVertexCount());
     partitionStore.putPartition(partition);
-    partition = partitionStore.getPartition(4);
+    partition = partitionStore.getOrCreatePartition(4);
     assertEquals(1, partition.getVertexCount());
     assertEquals(2, partition.getEdgeCount());
     partitionStore.putPartition(partition);

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-examples/src/test/java/org/apache/giraph/TestBspBasic.java
----------------------------------------------------------------------
diff --git a/giraph-examples/src/test/java/org/apache/giraph/TestBspBasic.java 
b/giraph-examples/src/test/java/org/apache/giraph/TestBspBasic.java
index 115da7e..5612e5f 100644
--- a/giraph-examples/src/test/java/org/apache/giraph/TestBspBasic.java
+++ b/giraph-examples/src/test/java/org/apache/giraph/TestBspBasic.java
@@ -19,7 +19,7 @@
 package org.apache.giraph;
 
 import org.apache.giraph.aggregators.TextAggregatorWriter;
-import org.apache.giraph.combiner.SimpleSumCombiner;
+import org.apache.giraph.combiner.SimpleSumMessageCombiner;
 import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
@@ -290,7 +290,8 @@ public class
   }
 
   /**
-   * Run a sample BSP job locally with combiner and checkout output value.
+   * Run a sample BSP job locally with message combiner and
+   * checkout output value.
    *
    * @throws IOException
    * @throws ClassNotFoundException
@@ -302,7 +303,7 @@ public class
     GiraphConfiguration conf = new GiraphConfiguration();
     conf.setComputationClass(SimpleCombinerComputation.class);
     conf.setVertexInputFormatClass(SimpleSuperstepVertexInputFormat.class);
-    conf.setCombinerClass(SimpleSumCombiner.class);
+    conf.setMessageCombinerClass(SimpleSumMessageCombiner.class);
     GiraphJob job = prepareJob(getCallingMethodName(), conf);
     assertTrue(job.run(true));
   }

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-examples/src/test/java/org/apache/giraph/examples/ConnectedComponentsComputationTest.java
----------------------------------------------------------------------
diff --git 
a/giraph-examples/src/test/java/org/apache/giraph/examples/ConnectedComponentsComputationTest.java
 
b/giraph-examples/src/test/java/org/apache/giraph/examples/ConnectedComponentsComputationTest.java
index 7d326da..8883d6f 100644
--- 
a/giraph-examples/src/test/java/org/apache/giraph/examples/ConnectedComponentsComputationTest.java
+++ 
b/giraph-examples/src/test/java/org/apache/giraph/examples/ConnectedComponentsComputationTest.java
@@ -18,7 +18,7 @@
 
 package org.apache.giraph.examples;
 
-import org.apache.giraph.combiner.MinimumIntCombiner;
+import org.apache.giraph.combiner.MinimumIntMessageCombiner;
 import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.edge.ByteArrayEdges;
 import org.apache.giraph.io.formats.IdWithValueTextOutputFormat;
@@ -68,7 +68,7 @@ public class ConnectedComponentsComputationTest {
         GiraphConfiguration conf = new GiraphConfiguration();
         conf.setComputationClass(ConnectedComponentsComputation.class);
         conf.setOutEdgesClass(ByteArrayEdges.class);
-        conf.setCombinerClass(MinimumIntCombiner.class);
+        conf.setMessageCombinerClass(MinimumIntMessageCombiner.class);
         conf.setVertexInputFormatClass(IntIntNullTextInputFormat.class);
         conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-examples/src/test/java/org/apache/giraph/examples/ConnectedComponentsComputationTestInMemory.java
----------------------------------------------------------------------
diff --git 
a/giraph-examples/src/test/java/org/apache/giraph/examples/ConnectedComponentsComputationTestInMemory.java
 
b/giraph-examples/src/test/java/org/apache/giraph/examples/ConnectedComponentsComputationTestInMemory.java
index dbcd569..1bb8e94 100644
--- 
a/giraph-examples/src/test/java/org/apache/giraph/examples/ConnectedComponentsComputationTestInMemory.java
+++ 
b/giraph-examples/src/test/java/org/apache/giraph/examples/ConnectedComponentsComputationTestInMemory.java
@@ -18,7 +18,7 @@
 
 package org.apache.giraph.examples;
 
-import org.apache.giraph.combiner.MinimumIntCombiner;
+import org.apache.giraph.combiner.MinimumIntMessageCombiner;
 import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.edge.ByteArrayEdges;
 import org.apache.giraph.graph.Vertex;
@@ -59,7 +59,7 @@ public class ConnectedComponentsComputationTestInMemory {
     GiraphConfiguration conf = new GiraphConfiguration();
     conf.setComputationClass(ConnectedComponentsComputation.class);
     conf.setOutEdgesClass(ByteArrayEdges.class);
-    conf.setCombinerClass(MinimumIntCombiner.class);
+    conf.setMessageCombinerClass(MinimumIntMessageCombiner.class);
 
     TestGraph<IntWritable, IntWritable, NullWritable> graph =
       new TestGraph<IntWritable, IntWritable, NullWritable>(conf);

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-examples/src/test/java/org/apache/giraph/examples/MinimumIntCombinerTest.java
----------------------------------------------------------------------
diff --git 
a/giraph-examples/src/test/java/org/apache/giraph/examples/MinimumIntCombinerTest.java
 
b/giraph-examples/src/test/java/org/apache/giraph/examples/MinimumIntCombinerTest.java
index 434c756..aa6cd8a 100644
--- 
a/giraph-examples/src/test/java/org/apache/giraph/examples/MinimumIntCombinerTest.java
+++ 
b/giraph-examples/src/test/java/org/apache/giraph/examples/MinimumIntCombinerTest.java
@@ -18,26 +18,27 @@
 
 package org.apache.giraph.examples;
 
-import static org.junit.Assert.assertEquals;
-
-import org.apache.giraph.combiner.Combiner;
-import org.apache.giraph.combiner.MinimumIntCombiner;
+import org.apache.giraph.combiner.MessageCombiner;
+import org.apache.giraph.combiner.MinimumIntMessageCombiner;
 import org.apache.hadoop.io.IntWritable;
 import org.junit.Test;
 
+import static org.junit.Assert.assertEquals;
+
 public class MinimumIntCombinerTest {
 
   @Test
   public void testCombiner() throws Exception {
-    Combiner<IntWritable, IntWritable> combiner =
-        new MinimumIntCombiner();
+    MessageCombiner<IntWritable, IntWritable>
+        messageCombiner =
+        new MinimumIntMessageCombiner();
 
     IntWritable vertexId = new IntWritable(1);
-    IntWritable result = combiner.createInitialMessage();
-    combiner.combine(vertexId, result, new IntWritable(39947466));
-    combiner.combine(vertexId, result, new IntWritable(199));
-    combiner.combine(vertexId, result, new IntWritable(42));
-    combiner.combine(vertexId, result, new IntWritable(19998888));
+    IntWritable result = messageCombiner.createInitialMessage();
+    messageCombiner.combine(vertexId, result, new IntWritable(39947466));
+    messageCombiner.combine(vertexId, result, new IntWritable(199));
+    messageCombiner.combine(vertexId, result, new IntWritable(42));
+    messageCombiner.combine(vertexId, result, new IntWritable(19998888));
     assertEquals(42, result.get());
   }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-examples/src/test/java/org/apache/giraph/examples/TryMultiIpcBindingPortsTest.java
----------------------------------------------------------------------
diff --git 
a/giraph-examples/src/test/java/org/apache/giraph/examples/TryMultiIpcBindingPortsTest.java
 
b/giraph-examples/src/test/java/org/apache/giraph/examples/TryMultiIpcBindingPortsTest.java
index 1323ff6..77bf3f3 100644
--- 
a/giraph-examples/src/test/java/org/apache/giraph/examples/TryMultiIpcBindingPortsTest.java
+++ 
b/giraph-examples/src/test/java/org/apache/giraph/examples/TryMultiIpcBindingPortsTest.java
@@ -18,7 +18,7 @@
 
 package org.apache.giraph.examples;
 
-import org.apache.giraph.combiner.MinimumIntCombiner;
+import org.apache.giraph.combiner.MinimumIntMessageCombiner;
 import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.edge.ByteArrayEdges;
@@ -72,7 +72,7 @@ public class TryMultiIpcBindingPortsTest {
         GiraphConstants.FAIL_FIRST_IPC_PORT_BIND_ATTEMPT.set(conf, true);
         conf.setComputationClass(ConnectedComponentsComputation.class);
         conf.setOutEdgesClass(ByteArrayEdges.class);
-        conf.setCombinerClass(MinimumIntCombiner.class);
+        conf.setMessageCombinerClass(MinimumIntMessageCombiner.class);
         conf.setVertexInputFormatClass(IntIntNullTextInputFormat.class);
         conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-examples/src/test/java/org/apache/giraph/vertex/TestComputationTypes.java
----------------------------------------------------------------------
diff --git 
a/giraph-examples/src/test/java/org/apache/giraph/vertex/TestComputationTypes.java
 
b/giraph-examples/src/test/java/org/apache/giraph/vertex/TestComputationTypes.java
index 4f74fcb..a481db3 100644
--- 
a/giraph-examples/src/test/java/org/apache/giraph/vertex/TestComputationTypes.java
+++ 
b/giraph-examples/src/test/java/org/apache/giraph/vertex/TestComputationTypes.java
@@ -18,7 +18,7 @@
 
 package org.apache.giraph.vertex;
 
-import org.apache.giraph.combiner.Combiner;
+import org.apache.giraph.combiner.MessageCombiner;
 import org.apache.giraph.conf.GiraphConstants;
 import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
 import org.apache.giraph.edge.ByteArrayEdges;
@@ -63,8 +63,9 @@ public class TestComputationTypes {
     /**
      * Matches the {@link GeneratedComputationMatch}
      */
-    private static class GeneratedVertexMatchCombiner extends
-        Combiner<LongWritable, FloatWritable> {
+    private static class GeneratedVertexMatchMessageCombiner
+        extends
+        MessageCombiner<LongWritable, FloatWritable> {
       @Override
       public void combine(LongWritable vertexIndex,
           FloatWritable originalMessage,
@@ -80,8 +81,9 @@ public class TestComputationTypes {
     /**
      * Mismatches the {@link GeneratedComputationMatch}
      */
-    private static class GeneratedVertexMismatchCombiner extends
-        Combiner<LongWritable, DoubleWritable> {
+    private static class GeneratedVertexMismatchMessageCombiner
+        extends
+        MessageCombiner<LongWritable, DoubleWritable> {
       @Override
       public void combine(LongWritable vertexIndex,
           DoubleWritable originalMessage,
@@ -136,8 +138,8 @@ public class TestComputationTypes {
         GiraphConstants.VERTEX_EDGES_CLASS.set(conf, ByteArrayEdges.class);
         GiraphConstants.VERTEX_INPUT_FORMAT_CLASS.set(conf,
             SimpleSuperstepVertexInputFormat.class);
-        GiraphConstants.VERTEX_COMBINER_CLASS.set(conf,
-            GeneratedVertexMatchCombiner.class);
+        GiraphConstants.MESSAGE_COMBINER_CLASS.set(conf,
+            GeneratedVertexMatchMessageCombiner.class);
       @SuppressWarnings("rawtypes")
       GiraphConfigurationValidator<?, ?, ?, ?, ?> validator =
         new GiraphConfigurationValidator(conf);
@@ -203,8 +205,8 @@ public class TestComputationTypes {
       GiraphConstants.VERTEX_EDGES_CLASS.set(conf, ByteArrayEdges.class);
       GiraphConstants.VERTEX_INPUT_FORMAT_CLASS.set(conf,
         SimpleSuperstepVertexInputFormat.class);
-      GiraphConstants.VERTEX_COMBINER_CLASS.set(conf,
-        GeneratedVertexMismatchCombiner.class);
+      GiraphConstants.MESSAGE_COMBINER_CLASS.set(conf,
+        GeneratedVertexMismatchMessageCombiner.class);
       @SuppressWarnings("rawtypes")
       GiraphConfigurationValidator<?, ?, ?, ?, ?> validator =
         new GiraphConfigurationValidator(conf);

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/examples/HiveIntIntNullVertex.java
----------------------------------------------------------------------
diff --git 
a/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/examples/HiveIntIntNullVertex.java
 
b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/examples/HiveIntIntNullVertex.java
new file mode 100644
index 0000000..cc61441
--- /dev/null
+++ 
b/giraph-hive/src/main/java/org/apache/giraph/hive/input/vertex/examples/HiveIntIntNullVertex.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.hive.input.vertex.examples;
+
+import com.facebook.hiveio.common.HiveType;
+import com.facebook.hiveio.input.HiveInputDescription;
+import com.facebook.hiveio.record.HiveReadableRecord;
+import com.facebook.hiveio.schema.HiveTableSchema;
+import com.google.common.base.Preconditions;
+import org.apache.giraph.edge.Edge;
+import org.apache.giraph.hive.common.HiveParsing;
+import org.apache.giraph.hive.input.vertex.SimpleHiveToVertex;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+
+/**
+ * Simple HiveToVertex that reads vertices with integer IDs, no vertex values,
+ * and edges with no values.
+ */
+public class HiveIntIntNullVertex
+    extends SimpleHiveToVertex<IntWritable, IntWritable, NullWritable> {
+  @Override public void checkInput(HiveInputDescription inputDesc,
+      HiveTableSchema schema) {
+    Preconditions.checkArgument(schema.columnType(0) == HiveType.INT);
+    Preconditions.checkArgument(schema.columnType(1) == HiveType.LIST);
+  }
+
+  @Override
+  public Iterable<Edge<IntWritable, NullWritable>> getEdges(
+      HiveReadableRecord record) {
+    return HiveParsing.parseIntNullEdges(record, 1);
+  }
+
+  @Override
+  public IntWritable getVertexId(HiveReadableRecord record) {
+    return HiveParsing.parseIntID(record, 0);
+  }
+
+  @Override
+  public IntWritable getVertexValue(HiveReadableRecord record) {
+    return new IntWritable();
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-hive/src/main/java/org/apache/giraph/hive/jython/HiveJythonUtils.java
----------------------------------------------------------------------
diff --git 
a/giraph-hive/src/main/java/org/apache/giraph/hive/jython/HiveJythonUtils.java 
b/giraph-hive/src/main/java/org/apache/giraph/hive/jython/HiveJythonUtils.java
index 334f382..7ae8bc3 100644
--- 
a/giraph-hive/src/main/java/org/apache/giraph/hive/jython/HiveJythonUtils.java
+++ 
b/giraph-hive/src/main/java/org/apache/giraph/hive/jython/HiveJythonUtils.java
@@ -80,7 +80,7 @@ import static 
org.apache.giraph.conf.GiraphConstants.EDGE_INPUT_FORMAT_CLASS;
 import static org.apache.giraph.conf.GiraphConstants.GRAPH_TYPE_LANGUAGES;
 import static org.apache.giraph.conf.GiraphConstants.MAX_WORKERS;
 import static org.apache.giraph.conf.GiraphConstants.MIN_WORKERS;
-import static org.apache.giraph.conf.GiraphConstants.VERTEX_COMBINER_CLASS;
+import static org.apache.giraph.conf.GiraphConstants.MESSAGE_COMBINER_CLASS;
 import static org.apache.giraph.conf.GiraphConstants.VERTEX_INPUT_FORMAT_CLASS;
 import static 
org.apache.giraph.conf.GiraphConstants.VERTEX_OUTPUT_FORMAT_CLASS;
 import static 
org.apache.giraph.hive.common.GiraphHiveConstants.HIVE_EDGE_INPUT;
@@ -266,8 +266,8 @@ public class HiveJythonUtils {
 
     JythonUtils.init(conf, jythonJob.getComputation_name());
 
-    if (jythonJob.getCombiner() != null) {
-      VERTEX_COMBINER_CLASS.set(conf, jythonJob.getCombiner());
+    if (jythonJob.getMessageCombiner() != null) {
+      MESSAGE_COMBINER_CLASS.set(conf, jythonJob.getMessageCombiner());
     }
 
     conf.setInt(MIN_WORKERS, jythonJob.getWorkers());

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-hive/src/test/java/org/apache/giraph/hive/input/HiveVertexInputTest.java
----------------------------------------------------------------------
diff --git 
a/giraph-hive/src/test/java/org/apache/giraph/hive/input/HiveVertexInputTest.java
 
b/giraph-hive/src/test/java/org/apache/giraph/hive/input/HiveVertexInputTest.java
index c75652c..98ba014 100644
--- 
a/giraph-hive/src/test/java/org/apache/giraph/hive/input/HiveVertexInputTest.java
+++ 
b/giraph-hive/src/test/java/org/apache/giraph/hive/input/HiveVertexInputTest.java
@@ -17,6 +17,8 @@
  */
 package org.apache.giraph.hive.input;
 
+import com.facebook.hiveio.common.HiveMetastores;
+import com.facebook.hiveio.testing.LocalHiveServer;
 import org.apache.giraph.conf.GiraphConfiguration;
 import org.apache.giraph.hive.GiraphHiveTestBase;
 import org.apache.giraph.hive.Helpers;
@@ -24,16 +26,13 @@ import 
org.apache.giraph.hive.computations.ComputationCountEdges;
 import org.apache.giraph.hive.computations.ComputationSumEdges;
 import org.apache.giraph.hive.input.vertex.HiveVertexInputFormat;
 import org.apache.giraph.hive.input.vertex.examples.HiveIntDoubleDoubleVertex;
-import org.apache.giraph.hive.input.vertex.examples.HiveIntNullNullVertex;
+import org.apache.giraph.hive.input.vertex.examples.HiveIntIntNullVertex;
 import org.apache.giraph.io.formats.IdWithValueTextOutputFormat;
 import org.apache.giraph.utils.InternalVertexRunner;
 import org.apache.thrift.TException;
 import org.junit.Before;
 import org.junit.Test;
 
-import com.facebook.hiveio.common.HiveMetastores;
-import com.facebook.hiveio.testing.LocalHiveServer;
-
 import java.io.IOException;
 import java.util.Map;
 
@@ -66,7 +65,7 @@ public class HiveVertexInputTest extends GiraphHiveTestBase {
 
     GiraphConfiguration conf = new GiraphConfiguration();
     HIVE_VERTEX_INPUT.setTable(conf, tableName);
-    HIVE_VERTEX_INPUT.setClass(conf, HiveIntNullNullVertex.class);
+    HIVE_VERTEX_INPUT.setClass(conf, HiveIntIntNullVertex.class);
     conf.setComputationClass(ComputationCountEdges.class);
     conf.setVertexInputFormatClass(HiveVertexInputFormat.class);
     conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);
@@ -99,7 +98,7 @@ public class HiveVertexInputTest extends GiraphHiveTestBase {
     GiraphConfiguration conf = new GiraphConfiguration();
     HIVE_VERTEX_INPUT.setTable(conf, tableName);
     HIVE_VERTEX_INPUT.setPartition(conf, partition);
-    HIVE_VERTEX_INPUT.setClass(conf, HiveIntNullNullVertex.class);
+    HIVE_VERTEX_INPUT.setClass(conf, HiveIntIntNullVertex.class);
     conf.setComputationClass(ComputationCountEdges.class);
     conf.setVertexInputFormatClass(HiveVertexInputFormat.class);
     conf.setVertexOutputFormatClass(IdWithValueTextOutputFormat.class);

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/giraph-hive/src/test/resources/org/apache/giraph/jython/count-edges-launcher.py
----------------------------------------------------------------------
diff --git 
a/giraph-hive/src/test/resources/org/apache/giraph/jython/count-edges-launcher.py
 
b/giraph-hive/src/test/resources/org/apache/giraph/jython/count-edges-launcher.py
index a87f030..f5cf685 100644
--- 
a/giraph-hive/src/test/resources/org/apache/giraph/jython/count-edges-launcher.py
+++ 
b/giraph-hive/src/test/resources/org/apache/giraph/jython/count-edges-launcher.py
@@ -14,7 +14,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from org.apache.giraph.combiner import DoubleSumCombiner
+from org.apache.giraph.combiner import DoubleSumMessageCombiner
 from org.apache.giraph.edge import ByteArrayEdges
 from org.apache.giraph.jython import JythonJob
 from org.apache.hadoop.io import IntWritable

http://git-wip-us.apache.org/repos/asf/giraph/blob/a95066cd/src/site/xdoc/quick_start.xml
----------------------------------------------------------------------
diff --git a/src/site/xdoc/quick_start.xml b/src/site/xdoc/quick_start.xml
index 5df6555..4c14c75 100644
--- a/src/site/xdoc/quick_start.xml
+++ b/src/site/xdoc/quick_start.xml
@@ -224,7 +224,7 @@ usage: org.apache.giraph.utils.ConfigurationUtils [-aw 
&lt;arg&gt;] [-c &lt;arg&
        &lt;arg&gt;] [-vip &lt;arg&gt;] [-vvf &lt;arg&gt;] [-w &lt;arg&gt;] 
[-wc &lt;arg&gt;] [-yh &lt;arg&gt;]
        [-yj &lt;arg&gt;]
  -aw,--aggregatorWriter &lt;arg&gt;           AggregatorWriter class
- -c,--combiner &lt;arg&gt;                    Combiner class
+ -c,--messageCombiner &lt;arg&gt;             Message messageCombiner class
  -ca,--customArguments &lt;arg&gt;            provide custom arguments for the
                                         job configuration in the form: -ca
                                         
&lt;param1&gt;=&lt;value1&gt;,&lt;param2&gt;=&lt;value2&gt;

Reply via email to